From d4f764c0f0a30308d636f6aea1595d3477d582ec Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Tue, 19 May 2026 23:56:04 +0530 Subject: [PATCH] feat(sync): add tests for ondemand, service Signed-off-by: Vishwas Rajashekar --- pkg/extensions/sync/sync_internal_test.go | 450 +++++++++++++++++++++- 1 file changed, 449 insertions(+), 1 deletion(-) diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 4b8b3df9..ede4251b 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "net/http/httptest" "os" @@ -19,6 +20,9 @@ import ( godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/regclient/regclient" + rcBlob "github.com/regclient/regclient/types/blob" + rcManifest "github.com/regclient/regclient/types/manifest" + rcOCIV1 "github.com/regclient/regclient/types/oci/v1" "github.com/regclient/regclient/types/ref" . "github.com/smartystreets/goconvey/convey" @@ -1274,7 +1278,310 @@ func TestDestinationRegistry(t *testing.T) { }) } -// TestNewClientTimeoutBehavior verifies that newClient creates a client that respects timeouts. +type mockStreamManager struct { + streamingImageManifestFn func(repo, reference string) (rcManifest.Manifest, bool) + storeImageForStreamingFn func(repo, reference string, m rcManifest.Manifest) error + removeStreamingImageFn func(repo, reference string) +} + +func (m *mockStreamManager) StreamingImageManifest(repo, reference string) (rcManifest.Manifest, bool) { + if m.streamingImageManifestFn != nil { + return m.streamingImageManifestFn(repo, reference) + } + + return nil, false +} + +func (m *mockStreamManager) StoreImageForStreaming(repo, reference string, man rcManifest.Manifest) error { + if m.storeImageForStreamingFn != nil { + return m.storeImageForStreamingFn(repo, reference, man) + } + + return nil +} + +func (m *mockStreamManager) RemoveStreamingImage(repo, reference string) { + if m.removeStreamingImageFn != nil { + m.removeStreamingImageFn(repo, reference) + } +} + +func (m *mockStreamManager) ConnectClient(_ string, _ io.Writer) (*InFlightBlobCopier, error) { + return nil, nil +} + +func (m *mockStreamManager) StreamingBlobReader(reader *rcBlob.BReader) (*rcBlob.BReader, error) { + return reader, nil +} + +func (m *mockStreamManager) CachedBlobInfo(_ string) (int64, string, error) { + return 0, "", nil +} + +type mockSyncService struct { + fetchManifestFn func(ctx context.Context, repo, reference string) (rcManifest.Manifest, error) + isStreamingForRepoFn func(repo string) bool + getSyncTimeoutFn func() time.Duration + syncImageFn func(ctx context.Context, repo, reference string) error + canRetryOnErrorFn func() bool +} + +func (s *mockSyncService) FetchManifest(ctx context.Context, repo, reference string) (rcManifest.Manifest, error) { + if s.fetchManifestFn != nil { + return s.fetchManifestFn(ctx, repo, reference) + } + + return nil, zerr.ErrManifestNotFound +} + +func (s *mockSyncService) IsStreamingForRepo(repo string) bool { + if s.isStreamingForRepoFn != nil { + return s.isStreamingForRepoFn(repo) + } + + return false +} + +func (s *mockSyncService) GetSyncTimeout() time.Duration { + if s.getSyncTimeoutFn != nil { + return s.getSyncTimeoutFn() + } + + return 30 * time.Second +} + +func (s *mockSyncService) SyncImage(ctx context.Context, repo, reference string) error { + if s.syncImageFn != nil { + return s.syncImageFn(ctx, repo, reference) + } + + return nil +} + +func (s *mockSyncService) CanRetryOnError() bool { + if s.canRetryOnErrorFn != nil { + return s.canRetryOnErrorFn() + } + + return false +} + +func (s *mockSyncService) GetNextRepo(_ string) (string, error) { return "", nil } +func (s *mockSyncService) SyncRepo(_ context.Context, _ string) error { return nil } +func (s *mockSyncService) SyncReferrers(_ context.Context, _, _ string, _ []string) error { return nil } +func (s *mockSyncService) ResetCatalog() {} + +func newTestManifest(t *testing.T) rcManifest.Manifest { + t.Helper() + + origMan := rcOCIV1.Manifest{ + Versioned: rcOCIV1.ManifestSchemaVersion, + } + + m, err := rcManifest.New(rcManifest.WithOrig(origMan)) + if err != nil { + t.Fatalf("failed to create test manifest: %v", err) + } + + return m +} + +func TestOnDemandSetAndGetStreamManager(t *testing.T) { + Convey("StreamManager is nil before SetStreamManager is called", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + So(onDemand.StreamManager(), ShouldBeNil) + }) + + Convey("SetStreamManager stores the manager and StreamManager returns it", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + sm := &mockStreamManager{} + onDemand.SetStreamManager(sm) + So(onDemand.StreamManager(), ShouldEqual, sm) + }) + + Convey("SetStreamManager can be called multiple times and last value wins", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + sm1 := &mockStreamManager{} + sm2 := &mockStreamManager{} + onDemand.SetStreamManager(sm1) + onDemand.SetStreamManager(sm2) + So(onDemand.StreamManager(), ShouldEqual, sm2) + }) +} + +func TestOnDemandFetchManifestForStream(t *testing.T) { + Convey("returns cached manifest when StreamingImageManifest has image in cache", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + cached := newTestManifest(t) + + sm := &mockStreamManager{ + streamingImageManifestFn: func(repo, reference string) (rcManifest.Manifest, bool) { + So(repo, ShouldEqual, "myrepo") + So(reference, ShouldEqual, "v1.0") + + return cached, true + }, + } + onDemand.SetStreamManager(sm) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") + So(err, ShouldBeNil) + So(result, ShouldEqual, cached) + }) + + Convey("returns ErrBlobNotFound when no services are registered", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + + sm := &mockStreamManager{ + streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { + return nil, false + }, + } + onDemand.SetStreamManager(sm) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") + So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue) + So(result, ShouldBeNil) + }) + + Convey("returns ErrBlobNotFound when all services fail to fetch the manifest", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + + sm := &mockStreamManager{ + streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { + return nil, false + }, + } + onDemand.SetStreamManager(sm) + + svc := &mockSyncService{ + fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { + return nil, errors.New("upstream unavailable") + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + } + onDemand.Add(svc) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") + So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue) + So(result, ShouldBeNil) + }) + + Convey("fetches manifest from service, stores it for streaming and triggers background sync", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + fetched := newTestManifest(t) + + var storeRepo, storeRef string + var storedManifest rcManifest.Manifest + + sm := &mockStreamManager{ + streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { + return nil, false + }, + storeImageForStreamingFn: func(repo, reference string, m rcManifest.Manifest) error { + storeRepo = repo + storeRef = reference + storedManifest = m + + return nil + }, + } + onDemand.SetStreamManager(sm) + + syncCalled := make(chan struct{}) + svc := &mockSyncService{ + fetchManifestFn: func(_ context.Context, repo, reference string) (rcManifest.Manifest, error) { + return fetched, nil + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + syncImageFn: func(_ context.Context, _, _ string) error { + close(syncCalled) + + return nil + }, + } + onDemand.Add(svc) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "latest") + So(err, ShouldBeNil) + So(result, ShouldEqual, fetched) + So(storeRepo, ShouldEqual, "myrepo") + So(storeRef, ShouldEqual, "latest") + So(storedManifest, ShouldEqual, fetched) + + select { + case <-syncCalled: + // background sync was triggered as expected + case <-time.After(2 * time.Second): + So("background sync goroutine was not triggered", ShouldBeEmpty) + } + }) + + Convey("stops at the first successful service and ignores later services", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + fetched := newTestManifest(t) + + sm := &mockStreamManager{ + streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { + return nil, false + }, + } + onDemand.SetStreamManager(sm) + + secondFetchCalled := false + + svc1 := &mockSyncService{ + fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { + return fetched, nil + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + } + svc2 := &mockSyncService{ + fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { + secondFetchCalled = true + + return nil, errors.New("should not be reached") + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + } + onDemand.Add(svc1) + onDemand.Add(svc2) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v2.0") + So(err, ShouldBeNil) + So(result, ShouldEqual, fetched) + So(secondFetchCalled, ShouldBeFalse) + }) + + Convey("returns error when StoreImageForStreaming fails", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + fetched := newTestManifest(t) + storeErr := errors.New("disk full") + + sm := &mockStreamManager{ + streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { + return nil, false + }, + storeImageForStreamingFn: func(_, _ string, _ rcManifest.Manifest) error { + return storeErr + }, + } + onDemand.SetStreamManager(sm) + + svc := &mockSyncService{ + fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { + return fetched, nil + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + } + onDemand.Add(svc) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") + So(errors.Is(err, storeErr), ShouldBeTrue) + So(result, ShouldBeNil) + }) +} + func TestNewClientTimeoutBehavior(t *testing.T) { Convey("Test newClient timeout behavior", t, func() { logger := log.NewTestLogger() @@ -1381,3 +1688,144 @@ func TestNewClientTimeoutBehavior(t *testing.T) { }) }) } + +func TestBaseServiceIsStreamingForRepo(t *testing.T) { + streamEnabled := true + streamDisabled := false + + Convey("returns false when Stream is not configured)", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse) + }) + + Convey("returns false when Stream is explicitly disabled", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamDisabled, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse) + }) + + Convey("returns true when streaming enabled and no content filter configured", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + // Content is empty — all repos allowed + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + So(service.IsStreamingForRepo("anyrepo"), ShouldBeTrue) + So(service.IsStreamingForRepo("other/repo"), ShouldBeTrue) + }) + + Convey("returns true when streaming enabled and repo matches content filter", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + Content: []syncconf.Content{{Prefix: "myrepo"}}, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + So(service.IsStreamingForRepo("myrepo"), ShouldBeTrue) + }) + + Convey("returns false when streaming enabled but repo does not match content filter", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + Content: []syncconf.Content{{Prefix: "allowed-prefix"}}, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + So(service.IsStreamingForRepo("unrelated-repo"), ShouldBeFalse) + }) +} + +func TestBaseServiceFetchManifestWithStreaming(t *testing.T) { + streamEnabled := true + + Convey("returns ErrSyncImageFilteredOut when content filter rejects the repo", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + Content: []syncconf.Content{{Prefix: "upstream-prefix"}}, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + m, err := service.FetchManifest(context.Background(), "myrepo", "latest") + So(errors.Is(err, zerr.ErrSyncImageFilteredOut), ShouldBeTrue) + So(m, ShouldBeNil) + }) + + Convey("propagates GetImageReference error when streaming is enabled", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + getRefErr := errors.New("ref build failed") + service.remote = &mocks.SyncRemoteMock{ + GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { + return ref.Ref{}, getRefErr + }, + } + + m, err := service.FetchManifest(context.Background(), "myrepo", "latest") + So(errors.Is(err, getRefErr), ShouldBeTrue) + So(m, ShouldBeNil) + }) + + Convey("propagates ManifestGet error when GetImageReference succeeds but registry is unreachable", t, func() { + conf := syncconf.RegistryConfig{ + URLs: []string{"http://localhost"}, + Stream: &streamEnabled, + // No content filter + } + + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, + mocks.MetaDBMock{}, log.NewTestLogger()) + So(err, ShouldBeNil) + + service.remote = &mocks.SyncRemoteMock{ + GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { + // Return a syntactically valid ref pointing to a non-existent host. + return ref.New("localhost:0/" + repo + ":" + tag) + }, + GetHostNameFn: func() string { + return "localhost:0" + }, + } + + m, err := service.FetchManifest(context.Background(), "myrepo", "latest") + So(err, ShouldNotBeNil) + So(m, ShouldBeNil) + }) +}