spec: added support for mount request using hard link

This commit is contained in:
Shivam Mishra
2021-04-23 15:51:24 -07:00
committed by Ramkumar Chinchani
parent f7829d6470
commit a7c17b7c16
5 changed files with 341 additions and 32 deletions
+92 -16
View File
@@ -114,9 +114,19 @@ func (is *ImageStore) initRepo(name string) error {
}
// create "blobs" subdir
ensureDir(path.Join(repoDir, "blobs"), is.log)
err := ensureDir(path.Join(repoDir, "blobs"), is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blobs subdir")
return err
}
// create BlobUploadDir subdir
ensureDir(path.Join(repoDir, BlobUploadDir), is.log)
err = ensureDir(path.Join(repoDir, BlobUploadDir), is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blob upload subdir")
return err
}
// "oci-layout" file - create if it doesn't exist
ilPath := path.Join(repoDir, ispec.ImageLayoutFile)
@@ -497,7 +507,7 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType
// write manifest to "blobs"
dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String())
ensureDir(dir, is.log)
_ = ensureDir(dir, is.log)
file := path.Join(dir, mDigest.Encoded())
if err := ioutil.WriteFile(file, body, 0600); err != nil {
@@ -630,6 +640,8 @@ func (is *ImageStore) BlobUploadPath(repo string, uuid string) string {
// NewBlobUpload returns the unique ID for an upload in progress.
func (is *ImageStore) NewBlobUpload(repo string) (string, error) {
if err := is.InitRepo(repo); err != nil {
is.log.Error().Err(err).Msg("error initializing repo")
return "", err
}
@@ -796,7 +808,13 @@ func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader,
is.Lock()
defer is.Unlock()
ensureDir(dir, is.log)
err = ensureDir(dir, is.log)
if err != nil {
is.log.Error().Err(err).Msg("error creating blobs/sha256 dir")
return err
}
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
@@ -865,7 +883,7 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string)
is.Lock()
defer is.Unlock()
ensureDir(dir, is.log)
_ = ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
@@ -967,6 +985,37 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded())
}
func (is *ImageStore) MountBlob(repo string, mountRepo string, digest string) error {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return errors.ErrBadBlobDigest
}
mountBlobPath := is.BlobPath(mountRepo, d)
is.log.Debug().Str("mount path", mountBlobPath)
blobPath := is.BlobPath(repo, d)
is.log.Debug().Str("repo path", blobPath)
_, err = os.Stat(mountBlobPath)
if err != nil {
is.log.Error().Err(err).Msg("mount: blob path not found")
return errors.ErrBlobNotFound
}
_, err = is.copyBlob(repo, blobPath, mountBlobPath)
if err != nil {
is.log.Error().Err(err).Msg("cache: error copying blobs from cache location")
return err
}
return nil
}
// CheckBlob verifies a blob and returns true if the blob is correct.
func (is *ImageStore) CheckBlob(repo string, digest string,
mediaType string) (bool, int64, error) {
@@ -988,44 +1037,67 @@ func (is *ImageStore) CheckBlob(repo string, digest string,
blobInfo, err := os.Stat(blobPath)
if err == nil {
is.log.Debug().Str("blob path", blobPath).Msg("blob path found")
return true, blobInfo.Size(), nil
}
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
if !is.dedupe || is.cache == nil {
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found")
return false, -1, errors.ErrBlobNotFound
}
// lookup cache and if found, dedupe here
dstRecord, err := is.cache.GetBlob(digest)
// If found copy to location
blobSize, err := is.copyBlob(repo, blobPath, dstRecord)
if err != nil {
return false, -1, errors.ErrBlobNotFound
}
return true, blobSize, nil
}
func (is *ImageStore) checkCacheBlob(digest string) (string, error) {
if !is.dedupe || is.cache == nil {
return "", errors.ErrBlobNotFound
}
dstRecord, err := is.cache.GetBlob(digest)
if err != nil {
return "", err
}
dstRecord = path.Join(is.rootDir, dstRecord)
is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record")
return dstRecord, nil
}
func (is *ImageStore) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) {
if err := is.initRepo(repo); err != nil {
is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo")
return false, -1, err
return -1, err
}
ensureDir(filepath.Dir(blobPath), is.log)
_ = ensureDir(filepath.Dir(blobPath), is.log)
if err := os.Link(dstRecord, blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to hard link")
return false, -1, errors.ErrBlobNotFound
return -1, errors.ErrBlobNotFound
}
blobInfo, err = os.Stat(blobPath)
blobInfo, err := os.Stat(blobPath)
if err == nil {
return true, blobInfo.Size(), nil
return blobInfo.Size(), nil
}
return false, -1, errors.ErrBlobNotFound
return -1, errors.ErrBlobNotFound
}
// GetBlob returns a stream to read the blob.
@@ -1115,10 +1187,14 @@ func dirExists(d string) bool {
return true
}
func ensureDir(dir string, log zerolog.Logger) {
func ensureDir(dir string, log zerolog.Logger) error {
if err := os.MkdirAll(dir, 0755); err != nil {
log.Panic().Err(err).Str("dir", dir).Msg("unable to create dir")
log.Error().Err(err).Str("dir", dir).Msg("unable to create dir")
return err
}
return nil
}
func ifOlderThan(is *ImageStore, repo string, delay time.Duration) casext.GCPolicy {
+2 -1
View File
@@ -518,7 +518,8 @@ func TestNegativeCases(t *testing.T) {
err = os.Chmod(dir, 0000) // remove all perms
So(err, ShouldBeNil)
if os.Geteuid() != 0 {
So(func() { _ = il.InitRepo("test") }, ShouldPanic)
err = il.InitRepo("test")
So(err, ShouldNotBeNil)
}
})