dedupe: use hard links to dedupe blobs

As the number of repos and layers increases, the greater the probability
that layers are duplicated. We dedupe using hard links when content is
the same. This is intended to be purely a storage layer optimization.
Access control when available is orthogonal this optimization.

Add a durable cache to help speed up layer lookups.

Update README.

Add more unit tests.
This commit is contained in:
Ramkumar Chinchani
2020-02-17 13:57:15 -08:00
parent 365145d2cd
commit 25f5a45296
13 changed files with 767 additions and 88 deletions
+7 -2
View File
@@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"github.com/anuvu/zot/errors"
"github.com/anuvu/zot/pkg/log"
@@ -41,12 +42,16 @@ func (c *Controller) Run() error {
engine.Use(log.SessionLogger(c.Log), handlers.RecoveryHandler(handlers.RecoveryLogger(c.Log),
handlers.PrintRecoveryStack(false)))
c.ImageStore = storage.NewImageStore(c.Config.Storage.RootDirectory, c.Log)
if c.ImageStore == nil {
// we can't proceed without at least a image store
os.Exit(1)
}
c.Router = engine
c.Router.UseEncodedPath()
_ = NewRouteHandler(c)
c.ImageStore = storage.NewImageStore(c.Config.Storage.RootDirectory, c.Log)
addr := fmt.Sprintf("%s:%s", c.Config.HTTP.Address, c.Config.HTTP.Port)
server := &http.Server{Addr: addr, Handler: c.Router}
c.Server = server
+6 -8
View File
@@ -20,7 +20,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
_ "github.com/anuvu/zot/docs" // nolint (golint) - as required by swaggo
"github.com/anuvu/zot/errors"
@@ -41,12 +40,11 @@ const (
)
type RouteHandler struct {
c *Controller
blobLock sync.RWMutex
c *Controller
}
func NewRouteHandler(c *Controller) *RouteHandler {
rh := &RouteHandler{c: c, blobLock: sync.RWMutex{}}
rh := &RouteHandler{c: c}
rh.SetupRoutes()
return rh
@@ -56,9 +54,9 @@ func NewRouteHandler(c *Controller) *RouteHandler {
func (rh *RouteHandler) blobRLockWrapper(f func(w http.ResponseWriter,
r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
rh.blobLock.RLock()
rh.c.ImageStore.RLock()
f(w, r)
rh.blobLock.RUnlock()
rh.c.ImageStore.RUnlock()
}
}
@@ -66,9 +64,9 @@ func (rh *RouteHandler) blobRLockWrapper(f func(w http.ResponseWriter,
func (rh *RouteHandler) blobLockWrapper(f func(w http.ResponseWriter,
r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
rh.blobLock.Lock()
rh.c.ImageStore.Lock()
f(w, r)
rh.blobLock.Unlock()
rh.c.ImageStore.Unlock()
}
}
+11 -11
View File
@@ -135,7 +135,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
}
// without a "?digest=<>" should fail
content := []byte("this is a blob")
content := []byte("this is a blob1")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
resp, err = resty.R().Put(loc)
@@ -172,7 +172,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
Convey("Monolithic blob upload with body", func() {
Print("\nMonolithic blob upload")
// create content
content := []byte("this is a blob")
content := []byte("this is a blob2")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// setting invalid URL params should fail
@@ -228,7 +228,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
}
// without a "?digest=<>" should fail
content := []byte("this is a blob")
content := []byte("this is a blob3")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
resp, err = resty.R().Put(loc)
@@ -271,7 +271,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
So(loc, ShouldNotBeEmpty)
var buf bytes.Buffer
chunk1 := []byte("this is the first chunk")
chunk1 := []byte("this is the first chunk1")
n, err := buf.Write(chunk1)
So(n, ShouldEqual, len(chunk1))
So(err, ShouldBeNil)
@@ -299,7 +299,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
So(resp.StatusCode(), ShouldEqual, 416)
So(resp.String(), ShouldNotBeEmpty)
chunk2 := []byte("this is the second chunk")
chunk2 := []byte("this is the second chunk1")
n, err = buf.Write(chunk2)
So(n, ShouldEqual, len(chunk2))
So(err, ShouldBeNil)
@@ -339,7 +339,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
So(loc, ShouldNotBeEmpty)
var buf bytes.Buffer
chunk1 := []byte("this is the first chunk")
chunk1 := []byte("this is the first chunk2")
n, err := buf.Write(chunk1)
So(n, ShouldEqual, len(chunk1))
So(err, ShouldBeNil)
@@ -367,7 +367,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
So(resp.StatusCode(), ShouldEqual, 416)
So(resp.String(), ShouldNotBeEmpty)
chunk2 := []byte("this is the second chunk")
chunk2 := []byte("this is the second chunk2")
n, err = buf.Write(chunk2)
So(n, ShouldEqual, len(chunk2))
So(err, ShouldBeNil)
@@ -422,7 +422,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
loc := Location(baseURL, resp)
So(loc, ShouldNotBeEmpty)
content := []byte("this is a blob")
content := []byte("this is a blob4")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// monolithic blob upload
@@ -461,7 +461,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
resp, err = resty.R().Get(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 204)
content := []byte("this is a blob")
content := []byte("this is a blob5")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// monolithic blob upload: success
@@ -507,7 +507,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
So(d, ShouldNotBeEmpty)
So(d, ShouldEqual, digest.String())
content = []byte("this is a blob")
content = []byte("this is a blob5")
digest = godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// create a manifest with same blob but a different tag
@@ -611,7 +611,7 @@ func CheckWorkflows(t *testing.T, config *compliance.Config) {
resp, err = resty.R().Get(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 204)
content := []byte("this is a blob")
content := []byte("this is a blob7")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// monolithic blob upload: success
+10 -2
View File
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["storage.go"],
srcs = [
"cache.go",
"storage.go",
],
importpath = "github.com/anuvu/zot/pkg/storage",
visibility = ["//visibility:public"],
deps = [
@@ -13,16 +16,21 @@ go_library(
"@com_github_opencontainers_image_spec//specs-go/v1:go_default_library",
"@com_github_opensuse_umoci//:go_default_library",
"@com_github_rs_zerolog//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
],
)
go_test(
name = "go_default_test",
timeout = "short",
srcs = ["storage_test.go"],
srcs = [
"cache_test.go",
"storage_test.go",
],
embed = [":go_default_library"],
race = "on",
deps = [
"//errors:go_default_library",
"//pkg/log:go_default_library",
"@com_github_opencontainers_go_digest//:go_default_library",
"@com_github_opencontainers_image_spec//specs-go/v1:go_default_library",
+163
View File
@@ -0,0 +1,163 @@
package storage
import (
"path"
"strings"
"github.com/anuvu/zot/errors"
zlog "github.com/anuvu/zot/pkg/log"
"go.etcd.io/bbolt"
)
const (
BlobsCache = "blobs"
)
type Cache struct {
db *bbolt.DB
log zlog.Logger
}
// Blob is a blob record
type Blob struct {
Path string
}
func NewCache(rootDir string, name string, log zlog.Logger) *Cache {
dbPath := path.Join(rootDir, name+".db")
db, err := bbolt.Open(dbPath, 0600, nil)
if err != nil {
log.Error().Err(err).Str("dbPath", dbPath).Msg("unable to create cache db")
return nil
}
if err := db.Update(func(tx *bbolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte(BlobsCache)); err != nil {
// this is a serious failure
log.Error().Err(err).Str("dbPath", dbPath).Msg("unable to create a root bucket")
return err
}
return nil
}); err != nil {
// something went wrong
log.Error().Err(err).Msg("unable to create a cache")
return nil
}
return &Cache{db: db, log: log}
}
func (c *Cache) PutBlob(digest string, path string) error {
if err := c.db.Update(func(tx *bbolt.Tx) error {
root := tx.Bucket([]byte(BlobsCache))
if root == nil {
// this is a serious failure
err := errors.ErrCacheRootBucket
c.log.Error().Err(err).Msg("unable to access root bucket")
return err
}
b, err := root.CreateBucketIfNotExists([]byte(digest))
if err != nil {
// this is a serious failure
c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket")
return err
}
if err := b.Put([]byte(path), nil); err != nil {
c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record")
return err
}
return nil
}); err != nil {
return err
}
return nil
}
func (c *Cache) GetBlob(digest string) (string, error) {
var blobPath strings.Builder
if err := c.db.View(func(tx *bbolt.Tx) error {
root := tx.Bucket([]byte(BlobsCache))
if root == nil {
// this is a serious failure
err := errors.ErrCacheRootBucket
c.log.Error().Err(err).Msg("unable to access root bucket")
return err
}
b := root.Bucket([]byte(digest))
if b != nil {
// get first key
c := b.Cursor()
k, _ := c.First()
blobPath.WriteString(string(k))
return nil
}
return errors.ErrCacheMiss
}); err != nil {
return "", err
}
if len(blobPath.String()) == 0 {
return "", nil
}
return blobPath.String(), nil
}
func (c *Cache) HasBlob(digest string, blob string) bool {
if err := c.db.View(func(tx *bbolt.Tx) error {
root := tx.Bucket([]byte(BlobsCache))
if root == nil {
// this is a serious failure
err := errors.ErrCacheRootBucket
c.log.Error().Err(err).Msg("unable to access root bucket")
return err
}
b := root.Bucket([]byte(digest))
if b == nil {
return errors.ErrCacheMiss
}
if b.Get([]byte(blob)) == nil {
return errors.ErrCacheMiss
}
return nil
}); err != nil {
return false
}
return true
}
func (c *Cache) DeleteBlob(digest string, path string) error {
if err := c.db.Update(func(tx *bbolt.Tx) error {
root := tx.Bucket([]byte(BlobsCache))
if root == nil {
// this is a serious failure
err := errors.ErrCacheRootBucket
c.log.Error().Err(err).Msg("unable to access root bucket")
return err
}
b := root.Bucket([]byte(digest))
if b == nil {
return errors.ErrCacheMiss
}
if err := b.Delete([]byte(path)); err != nil {
c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete")
return err
}
return nil
}); err != nil {
return err
}
return nil
}
+52
View File
@@ -0,0 +1,52 @@
package storage_test
import (
"io/ioutil"
"os"
"testing"
"github.com/anuvu/zot/errors"
"github.com/anuvu/zot/pkg/log"
"github.com/anuvu/zot/pkg/storage"
. "github.com/smartystreets/goconvey/convey"
)
func TestCache(t *testing.T) {
Convey("Make a new cache", t, func() {
dir, err := ioutil.TempDir("", "cache_test")
So(err, ShouldBeNil)
So(dir, ShouldNotBeEmpty)
defer os.RemoveAll(dir)
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
So(storage.NewCache("/deadBEEF", "cache_test", log), ShouldBeNil)
c := storage.NewCache(dir, "cache_test", log)
So(c, ShouldNotBeNil)
v, err := c.GetBlob("key")
So(err, ShouldEqual, errors.ErrCacheMiss)
So(v, ShouldBeEmpty)
b := c.HasBlob("key", "value")
So(b, ShouldBeFalse)
err = c.PutBlob("key", "value")
So(err, ShouldBeNil)
b = c.HasBlob("key", "value")
So(b, ShouldBeTrue)
v, err = c.GetBlob("key")
So(err, ShouldBeNil)
So(v, ShouldNotBeEmpty)
err = c.DeleteBlob("bogusKey", "bogusValue")
So(err, ShouldEqual, errors.ErrCacheMiss)
err = c.DeleteBlob("key", "bogusValue")
So(err, ShouldBeNil)
})
}
+137 -56
View File
@@ -37,24 +37,50 @@ type ImageStore struct {
rootDir string
lock *sync.RWMutex
blobUploads map[string]BlobUpload
cache *Cache
log zerolog.Logger
}
// NewImageStore returns a new image store backed by a file storage.
func NewImageStore(rootDir string, log zlog.Logger) *ImageStore {
is := &ImageStore{rootDir: rootDir,
if _, err := os.Stat(rootDir); os.IsNotExist(err) {
if err := os.MkdirAll(rootDir, 0700); err != nil {
log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir")
return nil
}
}
is := &ImageStore{
rootDir: rootDir,
lock: &sync.RWMutex{},
blobUploads: make(map[string]BlobUpload),
cache: NewCache(rootDir, "cache", log),
log: log.With().Caller().Logger(),
}
if _, err := os.Stat(rootDir); os.IsNotExist(err) {
_ = os.MkdirAll(rootDir, 0700)
}
return is
}
// RLock read-lock
func (is *ImageStore) RLock() {
is.lock.RLock()
}
// RUnlock read-unlock
func (is *ImageStore) RUnlock() {
is.lock.RUnlock()
}
// Lock write-lock
func (is *ImageStore) Lock() {
is.lock.Lock()
}
// Unlock write-unlock
func (is *ImageStore) Unlock() {
is.lock.Unlock()
}
// InitRepo creates an image repository under this store.
func (is *ImageStore) InitRepo(name string) error {
repoDir := path.Join(is.rootDir, name)
@@ -63,16 +89,10 @@ func (is *ImageStore) InitRepo(name string) error {
return nil
}
// create repo dir
ensureDir(repoDir)
// create "blobs" subdir
dir := path.Join(repoDir, "blobs")
ensureDir(dir)
ensureDir(path.Join(repoDir, "blobs"), is.log)
// create BlobUploadDir subdir
dir = path.Join(repoDir, BlobUploadDir)
ensureDir(dir)
ensureDir(path.Join(repoDir, BlobUploadDir), is.log)
// "oci-layout" file - create if it doesn't exist
ilPath := path.Join(repoDir, ispec.ImageLayoutFile)
@@ -85,7 +105,8 @@ func (is *ImageStore) InitRepo(name string) error {
}
if err := ioutil.WriteFile(ilPath, buf, 0644); err != nil {
is.log.Panic().Err(err).Str("file", ilPath).Msg("unable to write file")
is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file")
return err
}
}
@@ -101,7 +122,8 @@ func (is *ImageStore) InitRepo(name string) error {
}
if err := ioutil.WriteFile(indexPath, buf, 0644); err != nil {
is.log.Panic().Err(err).Str("file", indexPath).Msg("unable to write file")
is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file")
return err
}
}
@@ -111,8 +133,8 @@ func (is *ImageStore) InitRepo(name string) error {
// ValidateRepo validates that the repository layout is complaint with the OCI repo layout.
func (is *ImageStore) ValidateRepo(name string) (bool, error) {
// https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content
// at least, expect exactly 4 entries - ["blobs", "oci-layout", "index.json"] and BlobUploadDir
// in each image store
// at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"]
// and an additional/optional BlobUploadDir in each image store
dir := path.Join(is.rootDir, name)
if !dirExists(dir) {
return false, errors.ErrRepoNotFound
@@ -124,15 +146,14 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) {
return false, errors.ErrRepoNotFound
}
if len(files) != 4 {
return false, nil
if len(files) < 3 {
return false, errors.ErrRepoBadVersion
}
found := map[string]bool{
"blobs": false,
ispec.ImageLayoutFile: false,
"index.json": false,
BlobUploadDir: false,
}
for _, file := range files {
@@ -195,7 +216,7 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
return nil
}
is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store")
//is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store")
stores = append(stores, rel)
return nil
@@ -212,7 +233,6 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return nil, errors.ErrRepoNotFound
@@ -290,9 +310,7 @@ func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, s
return nil, "", "", errors.ErrManifestNotFound
}
p := path.Join(dir, "blobs")
p = path.Join(p, digest.Algorithm().String())
p = path.Join(p, digest.Encoded())
p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
buf, err = ioutil.ReadFile(p)
@@ -319,19 +337,24 @@ func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, s
func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType string,
body []byte) (string, error) {
if err := is.InitRepo(repo); err != nil {
is.log.Debug().Err(err).Msg("init repo")
return "", err
}
if mediaType != ispec.MediaTypeImageManifest {
is.log.Debug().Interface("actual", mediaType).
Interface("expected", ispec.MediaTypeImageManifest).Msg("bad manifest media type")
return "", errors.ErrBadManifest
}
if len(body) == 0 {
is.log.Debug().Int("len", len(body)).Msg("invalid body length")
return "", errors.ErrBadManifest
}
var m ispec.Manifest
if err := json.Unmarshal(body, &m); err != nil {
is.log.Error().Err(err).Msg("unable to unmarshal JSON")
return "", errors.ErrBadManifest
}
@@ -345,6 +368,7 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType
blobPath := is.BlobPath(repo, digest)
if _, err := os.Stat(blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob")
return digest.String(), errors.ErrBlobNotFound
}
}
@@ -418,13 +442,12 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType
}
// write manifest to "blobs"
dir = path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, mDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String())
ensureDir(dir, is.log)
file := path.Join(dir, mDigest.Encoded())
if err := ioutil.WriteFile(file, body, 0644); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
@@ -435,10 +458,12 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType
buf, err = json.Marshal(index)
if err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON")
return "", err
}
if err := ioutil.WriteFile(file, buf, 0644); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
@@ -530,9 +555,7 @@ func (is *ImageStore) DeleteImageManifest(repo string, reference string) error {
return err
}
p := path.Join(dir, "blobs")
p = path.Join(p, digest.Algorithm().String())
p = path.Join(p, digest.Encoded())
p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
_ = os.Remove(p)
@@ -542,8 +565,7 @@ func (is *ImageStore) DeleteImageManifest(repo string, reference string) error {
// BlobUploadPath returns the upload path for a blob in this store.
func (is *ImageStore) BlobUploadPath(repo string, uuid string) string {
dir := path.Join(is.rootDir, repo)
blobUploadPath := path.Join(dir, BlobUploadDir)
blobUploadPath = path.Join(blobUploadPath, uuid)
blobUploadPath := path.Join(dir, BlobUploadDir, uuid)
return blobUploadPath
}
@@ -711,14 +733,17 @@ func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader,
return errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, dstDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
// move the blob from uploads to final dest
_ = os.Rename(src, dst)
if is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return err
}
}
return err
}
@@ -767,34 +792,80 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string)
return "", -1, errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, dstDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
// move the blob from uploads to final dest
_ = os.Rename(src, dst)
if is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return "", -1, err
}
}
return uuid, n, err
}
// nolint (interfacer)
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
dstRecord, err := is.cache.GetBlob(dstDigest.String())
if err != nil && err != errors.ErrCacheMiss {
is.log.Error().Err(err).Str("blobPath", dst).Msg("unable to lookup blob record")
return err
}
if dstRecord == "" {
if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Msg("unable to insert blob record")
return err
}
// move the blob from uploads to final dest
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("unable to rename blob")
return err
}
} else {
dstRecordFi, err := os.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("unable to stat")
return err
}
dstFi, err := os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("unable to stat")
return err
}
if !os.SameFile(dstFi, dstRecordFi) {
if err := os.Link(dstRecord, dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("unable to hard link")
return err
}
}
if err := os.Remove(src); err != nil {
is.log.Error().Err(err).Str("src", src).Msg("uname to remove blob")
return err
}
}
return nil
}
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
func (is *ImageStore) DeleteBlobUpload(repo string, uuid string) error {
blobUploadPath := is.BlobUploadPath(repo, uuid)
_ = os.Remove(blobUploadPath)
if err := os.Remove(blobUploadPath); err != nil {
is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload")
return err
}
return nil
}
// BlobPath returns the repository path of a blob.
func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
dir := path.Join(is.rootDir, repo)
blobPath := path.Join(dir, "blobs")
blobPath = path.Join(blobPath, digest.Algorithm().String())
blobPath = path.Join(blobPath, digest.Encoded())
return blobPath
return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded())
}
// CheckBlob verifies a blob and returns true if the blob is correct.
@@ -860,7 +931,17 @@ func (is *ImageStore) DeleteBlob(repo string, digest string) error {
return errors.ErrBlobNotFound
}
_ = os.Remove(blobPath)
if is.cache != nil {
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache")
return err
}
}
if err := os.Remove(blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path")
return err
}
return nil
}
@@ -888,8 +969,8 @@ func dirExists(d string) bool {
return true
}
func ensureDir(dir string) {
func ensureDir(dir string, log zerolog.Logger) {
if err := os.MkdirAll(dir, 0755); err != nil {
panic(err)
log.Panic().Err(err).Str("dir", dir).Msg("unable to create dir")
}
}
+344 -1
View File
@@ -6,6 +6,9 @@ import (
"encoding/json"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"testing"
"github.com/anuvu/zot/pkg/log"
@@ -86,13 +89,103 @@ func TestAPIs(t *testing.T) {
So(err, ShouldBeNil)
So(b, ShouldBeGreaterThanOrEqualTo, 0)
content := []byte("test-data")
content := []byte("test-data1")
buf := bytes.NewBuffer(content)
l := buf.Len()
d := godigest.FromBytes(content)
b, err = il.PutBlobChunk("test", v, 0, int64(l), buf)
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
blobDigest := d
err = il.FinishBlobUpload("test", v, buf, d.String())
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
_, _, err = il.CheckBlob("test", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
_, _, err = il.GetBlob("test", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
m := ispec.Manifest{}
m.SchemaVersion = 2
mb, _ := json.Marshal(m)
Convey("Bad image manifest", func() {
_, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb)
So(err, ShouldNotBeNil)
_, _, _, err = il.GetImageManifest("test", d.String())
So(err, ShouldNotBeNil)
})
Convey("Good image manifest", func() {
m := ispec.Manifest{
Config: ispec.Descriptor{
Digest: d,
Size: int64(l),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: d,
Size: int64(l),
},
},
}
m.SchemaVersion = 2
mb, _ = json.Marshal(m)
d := godigest.FromBytes(mb)
_, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb)
So(err, ShouldBeNil)
_, _, _, err = il.GetImageManifest("test", d.String())
So(err, ShouldBeNil)
err = il.DeleteImageManifest("test", "1.0")
So(err, ShouldNotBeNil)
err = il.DeleteBlob("test", blobDigest.String())
So(err, ShouldBeNil)
err = il.DeleteImageManifest("test", d.String())
So(err, ShouldBeNil)
_, _, _, err = il.GetImageManifest("test", d.String())
So(err, ShouldNotBeNil)
})
})
err = il.DeleteBlobUpload("test", v)
So(err, ShouldNotBeNil)
})
Convey("New blob upload streamed", func() {
v, err := il.NewBlobUpload("test")
So(err, ShouldBeNil)
So(v, ShouldNotBeEmpty)
Convey("Get blob upload", func() {
b, err := il.GetBlobUpload("test", "invalid")
So(err, ShouldNotBeNil)
So(b, ShouldEqual, -1)
b, err = il.GetBlobUpload("test", v)
So(err, ShouldBeNil)
So(b, ShouldBeGreaterThanOrEqualTo, 0)
b, err = il.BlobUploadInfo("test", v)
So(err, ShouldBeNil)
So(b, ShouldBeGreaterThanOrEqualTo, 0)
content := []byte("test-data2")
buf := bytes.NewBuffer(content)
l := buf.Len()
d := godigest.FromBytes(content)
b, err = il.PutBlobChunkStreamed("test", v, buf)
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
err = il.FinishBlobUpload("test", v, buf, d.String())
So(err, ShouldBeNil)
@@ -151,7 +244,257 @@ func TestAPIs(t *testing.T) {
})
err = il.DeleteBlobUpload("test", v)
So(err, ShouldNotBeNil)
})
Convey("Dedupe", func() {
blobDigest1 := ""
blobDigest2 := ""
// manifest1
v, err := il.NewBlobUpload("dedupe1")
So(err, ShouldBeNil)
So(v, ShouldNotBeEmpty)
content := []byte("test-data3")
buf := bytes.NewBuffer(content)
l := buf.Len()
d := godigest.FromBytes(content)
b, err := il.PutBlobChunkStreamed("dedupe1", v, buf)
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
blobDigest1 = strings.Split(d.String(), ":")[1]
So(blobDigest1, ShouldNotBeEmpty)
err = il.FinishBlobUpload("dedupe1", v, buf, d.String())
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
_, _, err = il.CheckBlob("dedupe1", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
_, _, err = il.GetBlob("dedupe1", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
m := ispec.Manifest{}
m.SchemaVersion = 2
m = ispec.Manifest{
Config: ispec.Descriptor{
Digest: d,
Size: int64(l),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: d,
Size: int64(l),
},
},
}
m.SchemaVersion = 2
mb, _ := json.Marshal(m)
d = godigest.FromBytes(mb)
_, err = il.PutImageManifest("dedupe1", d.String(), ispec.MediaTypeImageManifest, mb)
So(err, ShouldBeNil)
_, _, _, err = il.GetImageManifest("dedupe1", d.String())
So(err, ShouldBeNil)
// manifest2
v, err = il.NewBlobUpload("dedupe2")
So(err, ShouldBeNil)
So(v, ShouldNotBeEmpty)
content = []byte("test-data3")
buf = bytes.NewBuffer(content)
l = buf.Len()
d = godigest.FromBytes(content)
b, err = il.PutBlobChunkStreamed("dedupe2", v, buf)
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
blobDigest2 = strings.Split(d.String(), ":")[1]
So(blobDigest2, ShouldNotBeEmpty)
err = il.FinishBlobUpload("dedupe2", v, buf, d.String())
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
_, _, err = il.CheckBlob("dedupe2", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
_, _, err = il.GetBlob("dedupe2", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
m = ispec.Manifest{}
m.SchemaVersion = 2
m = ispec.Manifest{
Config: ispec.Descriptor{
Digest: d,
Size: int64(l),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: d,
Size: int64(l),
},
},
}
m.SchemaVersion = 2
mb, _ = json.Marshal(m)
d = godigest.FromBytes(mb)
_, err = il.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, mb)
So(err, ShouldBeNil)
_, _, _, err = il.GetImageManifest("dedupe2", d.String())
So(err, ShouldBeNil)
// verify that dedupe with hard links happened
fi1, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldBeTrue)
})
Convey("Locks", func() {
// in parallel, a mix of read and write locks - mainly for coverage
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(2)
go func() {
defer wg.Done()
il.Lock()
func() {}()
il.Unlock()
}()
go func() {
defer wg.Done()
il.RLock()
func() {}()
il.RUnlock()
}()
}
wg.Wait()
})
})
}
func TestDedupe(t *testing.T) {
Convey("Dedupe", t, func(c C) {
Convey("Nil ImageStore", func() {
is := &storage.ImageStore{}
So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic)
})
Convey("Valid ImageStore", func() {
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
is := storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)})
So(is.DedupeBlob("", "", ""), ShouldNotBeNil)
})
})
}
func TestNegativeCases(t *testing.T) {
Convey("Invalid root dir", t, func(c C) {
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
os.RemoveAll(dir)
So(storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)}), ShouldNotBeNil)
So(storage.NewImageStore("/deadBEEF", log.Logger{Logger: zerolog.New(os.Stdout)}), ShouldBeNil)
})
Convey("Invalid init repo", t, func(c C) {
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
il := storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)})
err = os.Chmod(dir, 0000) // remove all perms
So(err, ShouldBeNil)
So(func() { _ = il.InitRepo("test") }, ShouldPanic)
})
Convey("Invalid validate repo", t, func(c C) {
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
il := storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)})
So(il, ShouldNotBeNil)
So(il.InitRepo("test"), ShouldBeNil)
files, err := ioutil.ReadDir(path.Join(dir, "test"))
So(err, ShouldBeNil)
for _, f := range files {
os.Remove(path.Join(dir, "test", f.Name()))
}
_, err = il.ValidateRepo("test")
So(err, ShouldNotBeNil)
os.RemoveAll(path.Join(dir, "test"))
_, err = il.ValidateRepo("test")
So(err, ShouldNotBeNil)
err = os.Chmod(dir, 0000) // remove all perms
So(err, ShouldBeNil)
So(func() { _, _ = il.ValidateRepo("test") }, ShouldPanic)
os.RemoveAll(dir)
_, err = il.GetRepositories()
So(err, ShouldNotBeNil)
})
Convey("Invalid get image tags", t, func(c C) {
il := &storage.ImageStore{}
_, err := il.GetImageTags("test")
So(err, ShouldNotBeNil)
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
il = storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)})
So(il, ShouldNotBeNil)
So(il.InitRepo("test"), ShouldBeNil)
So(os.Remove(path.Join(dir, "test", "index.json")), ShouldBeNil)
_, err = il.GetImageTags("test")
So(err, ShouldNotBeNil)
So(os.RemoveAll(path.Join(dir, "test")), ShouldBeNil)
So(il.InitRepo("test"), ShouldBeNil)
So(ioutil.WriteFile(path.Join(dir, "test", "index.json"), []byte{}, 0755), ShouldBeNil)
_, err = il.GetImageTags("test")
So(err, ShouldNotBeNil)
})
Convey("Invalid get image manifest", t, func(c C) {
il := &storage.ImageStore{}
_, _, _, err := il.GetImageManifest("test", "")
So(err, ShouldNotBeNil)
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
il = storage.NewImageStore(dir, log.Logger{Logger: zerolog.New(os.Stdout)})
So(il, ShouldNotBeNil)
So(il.InitRepo("test"), ShouldBeNil)
So(os.Remove(path.Join(dir, "test", "index.json")), ShouldBeNil)
_, _, _, err = il.GetImageManifest("test", "")
So(err, ShouldNotBeNil)
So(os.RemoveAll(path.Join(dir, "test")), ShouldBeNil)
So(il.InitRepo("test"), ShouldBeNil)
So(ioutil.WriteFile(path.Join(dir, "test", "index.json"), []byte{}, 0755), ShouldBeNil)
_, _, _, err = il.GetImageManifest("test", "")
So(err, ShouldNotBeNil)
})
}