mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
feat(sync): add tests for stream mgr,store
Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
@@ -0,0 +1,288 @@
|
||||
//go:build sync
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
"github.com/regclient/regclient/types/descriptor"
|
||||
rcManifest "github.com/regclient/regclient/types/manifest"
|
||||
rcOCIV1 "github.com/regclient/regclient/types/oci/v1"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
|
||||
zerr "zotregistry.dev/zot/v2/errors"
|
||||
"zotregistry.dev/zot/v2/pkg/log"
|
||||
)
|
||||
|
||||
type mockStreamTempStore struct {
|
||||
blobPathFn func(godigest.Digest) string
|
||||
}
|
||||
|
||||
func (m *mockStreamTempStore) BlobPath(dig godigest.Digest) string {
|
||||
if m.blobPathFn != nil {
|
||||
return m.blobPathFn(dig)
|
||||
}
|
||||
|
||||
return "/nonexistent/dir/" + dig.Encoded()
|
||||
}
|
||||
|
||||
func newTestChunkingStreamManager(dir string) *ChunkingStreamManager {
|
||||
logger := log.NewTestLogger()
|
||||
|
||||
return &ChunkingStreamManager{
|
||||
tempStore: NewLocalTempStore(dir, logger),
|
||||
activeStreams: map[string]*ChunkedBlobReader{},
|
||||
streamingRefs: map[string]rcManifest.Manifest{},
|
||||
blobInfoMap: map[string]descriptor.Descriptor{},
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func newTestOCIManifestWithBlobs(t *testing.T, configData, layerData []byte) rcManifest.Manifest {
|
||||
t.Helper()
|
||||
|
||||
origMan := rcOCIV1.Manifest{
|
||||
Versioned: rcOCIV1.ManifestSchemaVersion,
|
||||
Config: descriptor.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.config.v1+json",
|
||||
Digest: godigest.FromBytes(configData),
|
||||
Size: int64(len(configData)),
|
||||
},
|
||||
Layers: []descriptor.Descriptor{
|
||||
{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: godigest.FromBytes(layerData),
|
||||
Size: int64(len(layerData)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
m, err := rcManifest.New(rcManifest.WithOrig(origMan))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create test OCI manifest: %v", err)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerConnectClient(t *testing.T) {
|
||||
Convey("ConnectClient", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
Convey("returns ErrBlobNotFoundInActiveStreams when blob is not active", func() {
|
||||
digest := "sha256:" + strings.Repeat("a", 64)
|
||||
copier, err := sm.ConnectClient(digest, &bytes.Buffer{})
|
||||
So(errors.Is(err, zerr.ErrBlobNotFoundInActiveStreams), ShouldBeTrue)
|
||||
So(copier, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("returns error for an unparseable blob digest", func() {
|
||||
copier, err := sm.ConnectClient("not-a-valid-digest", &bytes.Buffer{})
|
||||
So(err, ShouldNotBeNil)
|
||||
So(copier, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("returns an InFlightBlobCopier for an active blob", func() {
|
||||
blobData := []byte("test blob content")
|
||||
desc := descriptor.Descriptor{
|
||||
Digest: godigest.FromBytes(blobData),
|
||||
Size: int64(len(blobData)),
|
||||
MediaType: "application/octet-stream",
|
||||
}
|
||||
|
||||
err := sm.prepareActiveStreamForBlob(desc)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
copier, err := sm.ConnectClient(desc.Digest.String(), &bytes.Buffer{})
|
||||
So(err, ShouldBeNil)
|
||||
So(copier, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerCachedBlobInfo(t *testing.T) {
|
||||
Convey("CachedBlobInfo", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
Convey("returns ErrBlobNotFound for an unknown blob", func() {
|
||||
digest := "sha256:" + strings.Repeat("b", 64)
|
||||
size, mt, err := sm.CachedBlobInfo(digest)
|
||||
So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue)
|
||||
So(size, ShouldEqual, 0)
|
||||
So(mt, ShouldBeEmpty)
|
||||
})
|
||||
|
||||
Convey("returns size and media type for a known blob", func() {
|
||||
blobData := []byte("cached blob data")
|
||||
desc := descriptor.Descriptor{
|
||||
Digest: godigest.FromBytes(blobData),
|
||||
Size: int64(len(blobData)),
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
}
|
||||
|
||||
sm.blobInfoMap[desc.Digest.String()] = desc
|
||||
|
||||
size, mt, err := sm.CachedBlobInfo(desc.Digest.String())
|
||||
So(err, ShouldBeNil)
|
||||
So(size, ShouldEqual, int64(len(blobData)))
|
||||
So(mt, ShouldEqual, "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerStreamingBlobReader(t *testing.T) {
|
||||
Convey("StreamingBlobReader", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
Convey("returns ErrBlobReaderMissing when blob has no active stream", func() {
|
||||
data := []byte("some blob")
|
||||
reader := newTestBReader(data)
|
||||
result, err := sm.StreamingBlobReader(reader)
|
||||
So(errors.Is(err, zerr.ErrBlobReaderMissing), ShouldBeTrue)
|
||||
So(result, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("initialises the chunked reader and returns a wrapped BReader for an active stream", func() {
|
||||
data := []byte("streaming blob")
|
||||
desc := descriptor.Descriptor{
|
||||
Digest: godigest.FromBytes(data),
|
||||
Size: int64(len(data)),
|
||||
MediaType: "application/octet-stream",
|
||||
}
|
||||
|
||||
err := sm.prepareActiveStreamForBlob(desc)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
reader := newTestBReader(data)
|
||||
result, err := sm.StreamingBlobReader(reader)
|
||||
So(err, ShouldBeNil)
|
||||
So(result, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerStoreImageForStreaming(t *testing.T) {
|
||||
Convey("StoreImageForStreaming", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
configData := []byte("config-payload")
|
||||
layerData := []byte("layer-payload")
|
||||
manifest := newTestOCIManifestWithBlobs(t, configData, layerData)
|
||||
|
||||
Convey("stores manifest and prepares active streams for all blobs", func() {
|
||||
err := sm.StoreImageForStreaming("myrepo", "v1.0", manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// Manifest entry should be stored.
|
||||
m, ok := sm.StreamingImageManifest("myrepo", "v1.0")
|
||||
So(ok, ShouldBeTrue)
|
||||
So(m, ShouldEqual, manifest)
|
||||
|
||||
// All three blobs (manifest, config, layer) should be active streams.
|
||||
manifestDigest := manifest.GetDescriptor().Digest.String()
|
||||
configDigest := godigest.FromBytes(configData).String()
|
||||
layerDigest := godigest.FromBytes(layerData).String()
|
||||
|
||||
_, hasManifest := sm.activeStreams[manifestDigest]
|
||||
So(hasManifest, ShouldBeTrue)
|
||||
|
||||
_, hasConfig := sm.activeStreams[configDigest]
|
||||
So(hasConfig, ShouldBeTrue)
|
||||
|
||||
_, hasLayer := sm.activeStreams[layerDigest]
|
||||
So(hasLayer, ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("storing the same repo:reference is idempotent", func() {
|
||||
err := sm.StoreImageForStreaming("myrepo", "v1.0", manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = sm.StoreImageForStreaming("myrepo", "v1.0", manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, ok := sm.StreamingImageManifest("myrepo", "v1.0")
|
||||
So(ok, ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("propagates error when the temp store cannot create a blob path", func() {
|
||||
sm.tempStore = &mockStreamTempStore{
|
||||
blobPathFn: func(_ godigest.Digest) string {
|
||||
return "/nonexistent/dir/blob"
|
||||
},
|
||||
}
|
||||
|
||||
err := sm.StoreImageForStreaming("myrepo", "v1.0", manifest)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerStreamingImageManifest(t *testing.T) {
|
||||
Convey("StreamingImageManifest", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
manifest := newTestOCIManifestWithBlobs(t, []byte("cfg"), []byte("lyr"))
|
||||
|
||||
Convey("returns nil and false when no entry exists", func() {
|
||||
m, ok := sm.StreamingImageManifest("repo", "tag")
|
||||
So(ok, ShouldBeFalse)
|
||||
So(m, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("returns the manifest and true after it has been stored", func() {
|
||||
err := sm.StoreImageForStreaming("repo", "tag", manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
m, ok := sm.StreamingImageManifest("repo", "tag")
|
||||
So(ok, ShouldBeTrue)
|
||||
So(m, ShouldEqual, manifest)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestChunkingStreamManagerRemoveStreamingImage(t *testing.T) {
|
||||
Convey("RemoveStreamingImage", t, func() {
|
||||
sm := newTestChunkingStreamManager(t.TempDir())
|
||||
|
||||
Convey("does not panic when no entry exists for the given repo:reference", func() {
|
||||
So(func() { sm.RemoveStreamingImage("nothere", "v0") }, ShouldNotPanic)
|
||||
})
|
||||
|
||||
Convey("removes manifest and all associated blobs after a successful store", func() {
|
||||
configData := []byte("cfg-payload")
|
||||
layerData := []byte("lyr-payload")
|
||||
manifest := newTestOCIManifestWithBlobs(t, configData, layerData)
|
||||
|
||||
err := sm.StoreImageForStreaming("myrepo", "latest", manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
manifestDigest := manifest.GetDescriptor().Digest.String()
|
||||
configDigest := godigest.FromBytes(configData).String()
|
||||
layerDigest := godigest.FromBytes(layerData).String()
|
||||
|
||||
// Confirm blobs are active before removal.
|
||||
_, ok := sm.activeStreams[manifestDigest]
|
||||
So(ok, ShouldBeTrue)
|
||||
|
||||
sm.RemoveStreamingImage("myrepo", "latest")
|
||||
|
||||
// Manifest entry should be gone.
|
||||
_, found := sm.StreamingImageManifest("myrepo", "latest")
|
||||
So(found, ShouldBeFalse)
|
||||
|
||||
// All active streams should be cleaned up.
|
||||
_, stillHasManifest := sm.activeStreams[manifestDigest]
|
||||
So(stillHasManifest, ShouldBeFalse)
|
||||
|
||||
_, stillHasConfig := sm.activeStreams[configDigest]
|
||||
So(stillHasConfig, ShouldBeFalse)
|
||||
|
||||
_, stillHasLayer := sm.activeStreams[layerDigest]
|
||||
So(stillHasLayer, ShouldBeFalse)
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,101 @@
|
||||
//go:build sync
|
||||
|
||||
package sync_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
|
||||
pkgsync "zotregistry.dev/zot/v2/pkg/extensions/sync"
|
||||
"zotregistry.dev/zot/v2/pkg/log"
|
||||
)
|
||||
|
||||
func TestNewLocalTempStore(t *testing.T) {
|
||||
Convey("NewLocalTempStore", t, func() {
|
||||
logger := log.NewTestLogger()
|
||||
|
||||
Convey("returns a non-nil store when the root directory already exists", func() {
|
||||
dir := t.TempDir()
|
||||
|
||||
store := pkgsync.NewLocalTempStore(dir, logger)
|
||||
So(store, ShouldNotBeNil)
|
||||
|
||||
info, err := os.Stat(dir)
|
||||
So(err, ShouldBeNil)
|
||||
So(info.IsDir(), ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("creates the root directory when it does not exist", func() {
|
||||
dir := filepath.Join(t.TempDir(), "new", "nested", "dir")
|
||||
|
||||
_, err := os.Stat(dir)
|
||||
So(os.IsNotExist(err), ShouldBeTrue)
|
||||
|
||||
store := pkgsync.NewLocalTempStore(dir, logger)
|
||||
So(store, ShouldNotBeNil)
|
||||
|
||||
info, err := os.Stat(dir)
|
||||
So(err, ShouldBeNil)
|
||||
So(info.IsDir(), ShouldBeTrue)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestLocalTempStoreBlobPath(t *testing.T) {
|
||||
Convey("LocalTempStore.BlobPath", t, func() {
|
||||
dir := t.TempDir()
|
||||
store := pkgsync.NewLocalTempStore(dir, log.NewTestLogger())
|
||||
|
||||
data := []byte("blob payload")
|
||||
dig := godigest.FromBytes(data)
|
||||
|
||||
Convey("returns a path with the expected format rootDir/algorithm/encoded", func() {
|
||||
blobPath := store.BlobPath(dig)
|
||||
expectedPath := filepath.Join(dir, dig.Algorithm().String(), dig.Encoded())
|
||||
So(blobPath, ShouldEqual, expectedPath)
|
||||
})
|
||||
|
||||
Convey("creates the algorithm sub-directory on first call", func() {
|
||||
algorithmDir := filepath.Join(dir, dig.Algorithm().String())
|
||||
|
||||
// Sub-directory must not exist yet.
|
||||
_, err := os.Stat(algorithmDir)
|
||||
So(os.IsNotExist(err), ShouldBeTrue)
|
||||
|
||||
store.BlobPath(dig)
|
||||
|
||||
info, err := os.Stat(algorithmDir)
|
||||
So(err, ShouldBeNil)
|
||||
So(info.IsDir(), ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("is idempotent — repeated calls return the same path", func() {
|
||||
first := store.BlobPath(dig)
|
||||
second := store.BlobPath(dig)
|
||||
So(first, ShouldEqual, second)
|
||||
})
|
||||
|
||||
Convey("different digests produce different paths under the same root", func() {
|
||||
dig2 := godigest.FromBytes([]byte("other payload"))
|
||||
|
||||
path1 := store.BlobPath(dig)
|
||||
path2 := store.BlobPath(dig2)
|
||||
So(path1, ShouldNotEqual, path2)
|
||||
})
|
||||
|
||||
Convey("returned path is writable — a file can be created there", func() {
|
||||
blobPath := store.BlobPath(dig)
|
||||
|
||||
err := os.WriteFile(blobPath, data, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
got, err := os.ReadFile(blobPath)
|
||||
So(err, ShouldBeNil)
|
||||
So(got, ShouldResemble, data)
|
||||
})
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user