feat(sync): add tests for ondemand, service

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
Vishwas Rajashekar
2026-05-19 23:56:04 +05:30
parent cf8f02f919
commit d4f764c0f0
+449 -1
View File
@@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
@@ -19,6 +20,9 @@ import (
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/regclient/regclient"
rcBlob "github.com/regclient/regclient/types/blob"
rcManifest "github.com/regclient/regclient/types/manifest"
rcOCIV1 "github.com/regclient/regclient/types/oci/v1"
"github.com/regclient/regclient/types/ref"
. "github.com/smartystreets/goconvey/convey"
@@ -1274,7 +1278,310 @@ func TestDestinationRegistry(t *testing.T) {
})
}
// TestNewClientTimeoutBehavior verifies that newClient creates a client that respects timeouts.
type mockStreamManager struct {
streamingImageManifestFn func(repo, reference string) (rcManifest.Manifest, bool)
storeImageForStreamingFn func(repo, reference string, m rcManifest.Manifest) error
removeStreamingImageFn func(repo, reference string)
}
func (m *mockStreamManager) StreamingImageManifest(repo, reference string) (rcManifest.Manifest, bool) {
if m.streamingImageManifestFn != nil {
return m.streamingImageManifestFn(repo, reference)
}
return nil, false
}
func (m *mockStreamManager) StoreImageForStreaming(repo, reference string, man rcManifest.Manifest) error {
if m.storeImageForStreamingFn != nil {
return m.storeImageForStreamingFn(repo, reference, man)
}
return nil
}
func (m *mockStreamManager) RemoveStreamingImage(repo, reference string) {
if m.removeStreamingImageFn != nil {
m.removeStreamingImageFn(repo, reference)
}
}
func (m *mockStreamManager) ConnectClient(_ string, _ io.Writer) (*InFlightBlobCopier, error) {
return nil, nil
}
func (m *mockStreamManager) StreamingBlobReader(reader *rcBlob.BReader) (*rcBlob.BReader, error) {
return reader, nil
}
func (m *mockStreamManager) CachedBlobInfo(_ string) (int64, string, error) {
return 0, "", nil
}
type mockSyncService struct {
fetchManifestFn func(ctx context.Context, repo, reference string) (rcManifest.Manifest, error)
isStreamingForRepoFn func(repo string) bool
getSyncTimeoutFn func() time.Duration
syncImageFn func(ctx context.Context, repo, reference string) error
canRetryOnErrorFn func() bool
}
func (s *mockSyncService) FetchManifest(ctx context.Context, repo, reference string) (rcManifest.Manifest, error) {
if s.fetchManifestFn != nil {
return s.fetchManifestFn(ctx, repo, reference)
}
return nil, zerr.ErrManifestNotFound
}
func (s *mockSyncService) IsStreamingForRepo(repo string) bool {
if s.isStreamingForRepoFn != nil {
return s.isStreamingForRepoFn(repo)
}
return false
}
func (s *mockSyncService) GetSyncTimeout() time.Duration {
if s.getSyncTimeoutFn != nil {
return s.getSyncTimeoutFn()
}
return 30 * time.Second
}
func (s *mockSyncService) SyncImage(ctx context.Context, repo, reference string) error {
if s.syncImageFn != nil {
return s.syncImageFn(ctx, repo, reference)
}
return nil
}
func (s *mockSyncService) CanRetryOnError() bool {
if s.canRetryOnErrorFn != nil {
return s.canRetryOnErrorFn()
}
return false
}
func (s *mockSyncService) GetNextRepo(_ string) (string, error) { return "", nil }
func (s *mockSyncService) SyncRepo(_ context.Context, _ string) error { return nil }
func (s *mockSyncService) SyncReferrers(_ context.Context, _, _ string, _ []string) error { return nil }
func (s *mockSyncService) ResetCatalog() {}
func newTestManifest(t *testing.T) rcManifest.Manifest {
t.Helper()
origMan := rcOCIV1.Manifest{
Versioned: rcOCIV1.ManifestSchemaVersion,
}
m, err := rcManifest.New(rcManifest.WithOrig(origMan))
if err != nil {
t.Fatalf("failed to create test manifest: %v", err)
}
return m
}
func TestOnDemandSetAndGetStreamManager(t *testing.T) {
Convey("StreamManager is nil before SetStreamManager is called", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
So(onDemand.StreamManager(), ShouldBeNil)
})
Convey("SetStreamManager stores the manager and StreamManager returns it", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
sm := &mockStreamManager{}
onDemand.SetStreamManager(sm)
So(onDemand.StreamManager(), ShouldEqual, sm)
})
Convey("SetStreamManager can be called multiple times and last value wins", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
sm1 := &mockStreamManager{}
sm2 := &mockStreamManager{}
onDemand.SetStreamManager(sm1)
onDemand.SetStreamManager(sm2)
So(onDemand.StreamManager(), ShouldEqual, sm2)
})
}
func TestOnDemandFetchManifestForStream(t *testing.T) {
Convey("returns cached manifest when StreamingImageManifest has image in cache", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
cached := newTestManifest(t)
sm := &mockStreamManager{
streamingImageManifestFn: func(repo, reference string) (rcManifest.Manifest, bool) {
So(repo, ShouldEqual, "myrepo")
So(reference, ShouldEqual, "v1.0")
return cached, true
},
}
onDemand.SetStreamManager(sm)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0")
So(err, ShouldBeNil)
So(result, ShouldEqual, cached)
})
Convey("returns ErrBlobNotFound when no services are registered", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
sm := &mockStreamManager{
streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) {
return nil, false
},
}
onDemand.SetStreamManager(sm)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0")
So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue)
So(result, ShouldBeNil)
})
Convey("returns ErrBlobNotFound when all services fail to fetch the manifest", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
sm := &mockStreamManager{
streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) {
return nil, false
},
}
onDemand.SetStreamManager(sm)
svc := &mockSyncService{
fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) {
return nil, errors.New("upstream unavailable")
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
}
onDemand.Add(svc)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0")
So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue)
So(result, ShouldBeNil)
})
Convey("fetches manifest from service, stores it for streaming and triggers background sync", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
fetched := newTestManifest(t)
var storeRepo, storeRef string
var storedManifest rcManifest.Manifest
sm := &mockStreamManager{
streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) {
return nil, false
},
storeImageForStreamingFn: func(repo, reference string, m rcManifest.Manifest) error {
storeRepo = repo
storeRef = reference
storedManifest = m
return nil
},
}
onDemand.SetStreamManager(sm)
syncCalled := make(chan struct{})
svc := &mockSyncService{
fetchManifestFn: func(_ context.Context, repo, reference string) (rcManifest.Manifest, error) {
return fetched, nil
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
syncImageFn: func(_ context.Context, _, _ string) error {
close(syncCalled)
return nil
},
}
onDemand.Add(svc)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "latest")
So(err, ShouldBeNil)
So(result, ShouldEqual, fetched)
So(storeRepo, ShouldEqual, "myrepo")
So(storeRef, ShouldEqual, "latest")
So(storedManifest, ShouldEqual, fetched)
select {
case <-syncCalled:
// background sync was triggered as expected
case <-time.After(2 * time.Second):
So("background sync goroutine was not triggered", ShouldBeEmpty)
}
})
Convey("stops at the first successful service and ignores later services", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
fetched := newTestManifest(t)
sm := &mockStreamManager{
streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) {
return nil, false
},
}
onDemand.SetStreamManager(sm)
secondFetchCalled := false
svc1 := &mockSyncService{
fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) {
return fetched, nil
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
}
svc2 := &mockSyncService{
fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) {
secondFetchCalled = true
return nil, errors.New("should not be reached")
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
}
onDemand.Add(svc1)
onDemand.Add(svc2)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v2.0")
So(err, ShouldBeNil)
So(result, ShouldEqual, fetched)
So(secondFetchCalled, ShouldBeFalse)
})
Convey("returns error when StoreImageForStreaming fails", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
fetched := newTestManifest(t)
storeErr := errors.New("disk full")
sm := &mockStreamManager{
streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) {
return nil, false
},
storeImageForStreamingFn: func(_, _ string, _ rcManifest.Manifest) error {
return storeErr
},
}
onDemand.SetStreamManager(sm)
svc := &mockSyncService{
fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) {
return fetched, nil
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
}
onDemand.Add(svc)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0")
So(errors.Is(err, storeErr), ShouldBeTrue)
So(result, ShouldBeNil)
})
}
func TestNewClientTimeoutBehavior(t *testing.T) {
Convey("Test newClient timeout behavior", t, func() {
logger := log.NewTestLogger()
@@ -1381,3 +1688,144 @@ func TestNewClientTimeoutBehavior(t *testing.T) {
})
})
}
func TestBaseServiceIsStreamingForRepo(t *testing.T) {
streamEnabled := true
streamDisabled := false
Convey("returns false when Stream is not configured)", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse)
})
Convey("returns false when Stream is explicitly disabled", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamDisabled,
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse)
})
Convey("returns true when streaming enabled and no content filter configured", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
// Content is empty — all repos allowed
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
So(service.IsStreamingForRepo("anyrepo"), ShouldBeTrue)
So(service.IsStreamingForRepo("other/repo"), ShouldBeTrue)
})
Convey("returns true when streaming enabled and repo matches content filter", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
Content: []syncconf.Content{{Prefix: "myrepo"}},
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
So(service.IsStreamingForRepo("myrepo"), ShouldBeTrue)
})
Convey("returns false when streaming enabled but repo does not match content filter", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
Content: []syncconf.Content{{Prefix: "allowed-prefix"}},
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
So(service.IsStreamingForRepo("unrelated-repo"), ShouldBeFalse)
})
}
func TestBaseServiceFetchManifestWithStreaming(t *testing.T) {
streamEnabled := true
Convey("returns ErrSyncImageFilteredOut when content filter rejects the repo", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
Content: []syncconf.Content{{Prefix: "upstream-prefix"}},
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
m, err := service.FetchManifest(context.Background(), "myrepo", "latest")
So(errors.Is(err, zerr.ErrSyncImageFilteredOut), ShouldBeTrue)
So(m, ShouldBeNil)
})
Convey("propagates GetImageReference error when streaming is enabled", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
getRefErr := errors.New("ref build failed")
service.remote = &mocks.SyncRemoteMock{
GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) {
return ref.Ref{}, getRefErr
},
}
m, err := service.FetchManifest(context.Background(), "myrepo", "latest")
So(errors.Is(err, getRefErr), ShouldBeTrue)
So(m, ShouldBeNil)
})
Convey("propagates ManifestGet error when GetImageReference succeeds but registry is unreachable", t, func() {
conf := syncconf.RegistryConfig{
URLs: []string{"http://localhost"},
Stream: &streamEnabled,
// No content filter
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil,
mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
service.remote = &mocks.SyncRemoteMock{
GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) {
// Return a syntactically valid ref pointing to a non-existent host.
return ref.New("localhost:0/" + repo + ":" + tag)
},
GetHostNameFn: func() string {
return "localhost:0"
},
}
m, err := service.FetchManifest(context.Background(), "myrepo", "latest")
So(err, ShouldNotBeNil)
So(m, ShouldBeNil)
})
}