//go:build sync package sync import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/http/httptest" "os" "path" "strings" "testing" "time" godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/regclient/regclient" rcBlob "github.com/regclient/regclient/types/blob" rcManifest "github.com/regclient/regclient/types/manifest" rcOCIV1 "github.com/regclient/regclient/types/oci/v1" "github.com/regclient/regclient/types/ref" . "github.com/smartystreets/goconvey/convey" zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/extensions/config" syncconf "zotregistry.dev/zot/v2/pkg/extensions/config/sync" "zotregistry.dev/zot/v2/pkg/extensions/lint" "zotregistry.dev/zot/v2/pkg/extensions/monitoring" syncConstants "zotregistry.dev/zot/v2/pkg/extensions/sync/constants" "zotregistry.dev/zot/v2/pkg/log" mTypes "zotregistry.dev/zot/v2/pkg/meta/types" "zotregistry.dev/zot/v2/pkg/storage" "zotregistry.dev/zot/v2/pkg/storage/cache" storageConstants "zotregistry.dev/zot/v2/pkg/storage/constants" "zotregistry.dev/zot/v2/pkg/storage/local" . "zotregistry.dev/zot/v2/pkg/test/image-utils" "zotregistry.dev/zot/v2/pkg/test/mocks" ) func TestService(t *testing.T) { Convey("trigger fetch tags error", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) err = service.SyncRepo(context.Background(), "repo") So(err, ShouldNotBeNil) }) Convey("test context cancellation in SyncRepo without mock", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a context that's already cancelled ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately err = service.SyncRepo(ctx, "repo") So(err, ShouldNotBeNil) // This will fail at getTags before reaching the cancellation check }) Convey("test context cancellation in SyncRepo with mock", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns tags so we can reach the loop mockRemote := &mocks.SyncRemoteMock{ GetTagsFn: func(ctx context.Context, repo string) ([]string, error) { return []string{"tag1", "tag2", "tag3"}, nil }, } service.remote = mockRemote // Create a context that's already cancelled ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately err = service.SyncRepo(ctx, "repo") So(err, ShouldNotBeNil) So(errors.Is(err, context.Canceled), ShouldBeTrue) }) Convey("test SyncReferrers ReferrerList error", t, func() { // SyncReferrers is the on-demand API; it calls syncReferrers(..., recursive=false) // so only direct referrers are synced (no referrers-of-referrers). conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a minimal mock remote that only returns tags mockRemote := &mocks.SyncRemoteMock{ GetTagsFn: func(ctx context.Context, repo string) ([]string, error) { return []string{"tag1"}, nil }, } service.remote = mockRemote // Set rc to nil to force a panic at ReferrerList call service.rc = nil // Use defer to catch the panic - this confirms we reached the ReferrerList call var panicOccurred bool defer func() { if r := recover(); r != nil { panicOccurred = true t.Logf("SyncReferrers panic (expected): %v", r) } }() ctx := context.Background() err = service.SyncReferrers(ctx, "repo", "tag1", []string{"signature"}) // We expect a panic when rc is nil, which confirms we reached the ReferrerList call So(panicOccurred, ShouldBeTrue) }) Convey("test syncImage skips OCI conversion but still commits when image already synced", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Mock remote returns isConverted=true so OCI conversion would be attempted if not skipped mockRemote := &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { return ref.New("mock-registry/" + repo + ":" + tag) }, GetOCIDigestFn: func(ctx context.Context, repo, tag string) (godigest.Digest, godigest.Digest, bool, error) { // isConverted=true means OCI conversion would be attempted return godigest.Digest("sha256:abc123"), godigest.Digest("sha256:def456"), true, nil }, } service.remote = mockRemote commitAllCalled := false // Mock destination returns CanSkipImage=true (already synced) mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { return ref.New("local/" + repo + ":" + tag) }, CanSkipImageFn: func(repo string, tag string, digest godigest.Digest) (bool, error) { return true, nil }, CommitAllFn: func(repo string, imageReference ref.Ref) error { commitAllCalled = true return nil }, CleanupImageFn: func(imageReference ref.Ref, repo string) error { return nil }, } service.destination = mockDest ctx := context.Background() err = service.syncImage(ctx, "localrepo", "remoterepo", "tag1", []string{}, false) // Should succeed without error So(err, ShouldBeNil) // CommitAll should still be called to persist any new referrers even when image was skipped So(commitAllCalled, ShouldBeTrue) // OCI conversion is NOT called because skipped=true guards it. // If mod.Apply were called, it would fail since service.rc is nil - proving the guard works. }) Convey("test syncImage ReferrerList error with OnlySigned", t, func() { onlySigned := true conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, OnlySigned: &onlySigned, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns an invalid reference to trigger ReferrerList error mockRemote := &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { // Return an invalid reference that will cause ReferrerList to fail with "ref is not set" error return ref.Ref{}, nil }, GetDigestFn: func(ctx context.Context, repo, tag string) (godigest.Digest, error) { return godigest.Digest("sha256:abc123"), nil }, } service.remote = mockRemote // Create a mock destination mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { return ref.New("local/" + repo + ":" + tag) }, } service.destination = mockDest ctx := context.Background() err = service.syncImage(ctx, "localrepo", "remoterepo", "tag1", []string{}, true) // We expect an error when ReferrerList fails with "ref is not set" error So(err, ShouldNotBeNil) So(err.Error(), ShouldContainSubstring, "ref is not set") }) Convey("test syncReferrers ReferrerList error", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns valid references mockRemote := &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { return ref.New(repo + ":" + tag) }, GetDigestFn: func(ctx context.Context, repo, tag string) (godigest.Digest, error) { return godigest.Digest("sha256:abc123"), nil }, } service.remote = mockRemote // Create a mock destination mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo string, tag string) (ref.Ref, error) { return ref.New("local/" + repo + ":" + tag) }, } service.destination = mockDest ctx := context.Background() localImageRef, err := ref.New("local/repo:tag") So(err, ShouldBeNil) // Create an invalid remote reference that will cause ReferrerList to fail with "ref is not set" error remoteImageRef := ref.Ref{} err = service.syncReferrers(ctx, []string{"tag"}, "localrepo", "remoterepo", localImageRef, remoteImageRef, false) // The error should be "ref is not set" as defined in regclient ReferrerList function So(err, ShouldNotBeNil) So(err.Error(), ShouldContainSubstring, "ref is not set") err = service.syncReferrers(ctx, []string{"tag"}, "localrepo", "remoterepo", localImageRef, remoteImageRef, true) // The error should be "ref is not set" as defined in regclient ReferrerList function So(err, ShouldNotBeNil) So(err.Error(), ShouldContainSubstring, "ref is not set") }) Convey("test LoadOrStore continue path by pre-populating requestStore", t, func() { // Strategy: Pre-populate requestStore to force LoadOrStore to return true maxRetries := 2 retryDelay := 100 * time.Millisecond conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost:32768"}, // Invalid port to force errors MaxRetries: &maxRetries, RetryDelay: &retryDelay, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) onDemand.Add(service) ctx := context.Background() // Step 1: Verify empty requestStore initially initialImageCount := 0 onDemand.requestStore.Range(func(key, value any) bool { initialImageCount++ return true }) So(initialImageCount, ShouldEqual, 0) // Step 2: Pre-populate image background retry request duplicateImageReq := request{ repo: "test-duplicate-repo", reference: "test-duplicate-tag", serviceID: 0, isBackground: true, } onDemand.requestStore.Store(duplicateImageReq, struct{}{}) // Step 3: Verify we now have 1 request preImageCount := 0 onDemand.requestStore.Range(func(key, value any) bool { preImageCount++ return true }) So(preImageCount, ShouldEqual, 1) // Should be 1 after pre-population // Step 4: Trigger sync - should execute continue path err = onDemand.SyncImage(ctx, "test-duplicate-repo", "test-duplicate-tag") So(err, ShouldNotBeNil) // Should still error due to invalid registry // Step 5: Verify CONTINUE PATH EXECUTED - no new requests created postImageCount := 0 onDemand.requestStore.Range(func(key, value any) bool { postImageCount++ return true }) So(postImageCount, ShouldEqual, preImageCount) // Count unchanged = continue executed // Step 6: Verify image request unchanged (proves no background goroutine started) value, exists := onDemand.requestStore.Load(duplicateImageReq) So(exists, ShouldBeTrue) So(value, ShouldEqual, struct{}{}) // Should still be pre-populated value // Step 7: Verify current state before referrer test - we should have 1 request initialReferrerCount := 0 onDemand.requestStore.Range(func(key, value any) bool { initialReferrerCount++ return true }) So(initialReferrerCount, ShouldEqual, 1) // Should have 1 from image test // Step 8: Pre-populate referrer background retry request duplicateReferrerReq := request{ repo: "test-duplicate-referrer-repo", reference: "sha256:duplicate", serviceID: 0, isBackground: true, } onDemand.requestStore.Store(duplicateReferrerReq, struct{}{}) // Step 9: Verify we now have 2 requests (image + referrer) preReferrerCount := 0 onDemand.requestStore.Range(func(key, value any) bool { preReferrerCount++ return true }) So(preReferrerCount, ShouldEqual, 2) // Should have 2 after referrer pre-population // Step 10: Trigger referrer sync - should execute continue path err = onDemand.SyncReferrers(ctx, "test-duplicate-referrer-repo", "sha256:duplicate", []string{"signature"}) So(err, ShouldNotBeNil) // Should still error due to invalid registry // Step 11: Verify CONTINUE PATH EXECUTED - no new referrer requests created postReferrerCount := 0 onDemand.requestStore.Range(func(key, value any) bool { postReferrerCount++ return true }) So(postReferrerCount, ShouldEqual, preReferrerCount) // Count unchanged = continue executed // Step 12: Verify referrer request unchanged (proves no background goroutine started) value, exists = onDemand.requestStore.Load(duplicateReferrerReq) So(exists, ShouldBeTrue) So(value, ShouldEqual, struct{}{}) // Should still be pre-populated value // Step 13: Final verification - exactly 2 requests total (both pre-populated, none deleted) finalCount := 0 onDemand.requestStore.Range(func(key, value any) bool { finalCount++ return true }) So(finalCount, ShouldEqual, 2) }) Convey("test continue paths for specific error types in on-demand sync", t, func() { // Strategy: Create multiple services where one returns ErrSyncImageFilteredOut // This will trigger the continue path when looping through services ctx := context.Background() // Create first service with content filtering maxRetries := 2 retryDelay := 100 * time.Millisecond conf1 := syncconf.RegistryConfig{ URLs: []string{"http://localhost:32768"}, // Invalid port to force errors MaxRetries: &maxRetries, RetryDelay: &retryDelay, Content: []syncconf.Content{{ Prefix: "different-prefix", // Won't match our test repo }}, } service1, err := New(conf1, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create second service for normal processing conf2 := syncconf.RegistryConfig{ URLs: []string{"http://localhost:32768"}, // Invalid port to force errors MaxRetries: &maxRetries, RetryDelay: &retryDelay, Content: []syncconf.Content{{ Prefix: "test-repo", // Will match our test repo }}, } service2, err := New(conf2, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) onDemand.Add(service1) // First service will return ErrSyncImageFilteredOut onDemand.Add(service2) // Second service will process normally but error on network // Test ErrSyncImageFilteredOut continue path in SyncImage // The first service returns ErrSyncImageFilteredOut (causes continue), second service processes but errors err = onDemand.SyncImage(ctx, "test-filtered-repo", "test-tag") So(err, ShouldNotBeNil) // Should get error from second service (network error) // Test ErrSyncImageFilteredOut continue path in SyncReferrers err = onDemand.SyncReferrers(ctx, "test-filtered-referrer-repo", "sha256:test", []string{"signature"}) So(err, ShouldNotBeNil) // Should get error from second service (network error) }) Convey("test continue paths in both SyncImage and SyncReferrers", t, func() { // Helper function to create filter test scenario createFilteredService := func(prefix string) (*BaseService, *BaseOnDemand) { maxRetries := 2 retryDelay := 100 * time.Millisecond conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost:32768"}, // Invalid port to force errors MaxRetries: &maxRetries, RetryDelay: &retryDelay, Content: []syncconf.Content{{ Prefix: prefix, // Won't match our test repo }}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) onDemand.Add(service) return service, onDemand } ctx := context.Background() // Test SyncReferrers continue path Convey("SyncReferrers continue path", func() { _, onDemand := createFilteredService("different-referrer-prefix") err := onDemand.SyncReferrers(ctx, "test-unfiltered-referrer-repo", "sha256:test", []string{"signature"}) So(err, ShouldNotBeNil) // Should get error from sync process (filtered out -> continue -> network error) }) // Test SyncImage continue path Convey("SyncImage continue path", func() { _, onDemand := createFilteredService("different-image-prefix") err := onDemand.SyncImage(ctx, "test-unfiltered-image-repo", "test-tag") So(err, ShouldNotBeNil) // Should get error from sync process (filtered out -> continue -> network error) }) }) Convey("test background retry goroutines for both SyncImage and SyncReferrers", t, func() { // Helper function to create background retry test scenario createBackgroundRetryService := func(prefix string) (*BaseService, *BaseOnDemand, time.Duration) { maxRetries := 2 retryDelay := 1 * time.Second conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost:32768"}, // Invalid port to force errors MaxRetries: &maxRetries, // Enable retries so CanRetryOnError() returns true RetryDelay: &retryDelay, Content: []syncconf.Content{{ Prefix: prefix, // Will match our repo }}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) onDemand.Add(service) return service, onDemand, retryDelay } // Test SyncReferrers background retry Convey("SyncReferrers background retry", func() { _, onDemand, retryDelay := createBackgroundRetryService("test-background-retry") ctx := context.Background() // Verify initial requestStore is empty initialCount := 0 onDemand.requestStore.Range(func(key, value any) bool { initialCount++ return true }) So(initialCount, ShouldEqual, 0) // Call SyncReferrers - should trigger background retry since: // 1. Network error (not a continue-path error) // 2. CanRetryOnError() returns true (maxRetries > 0) // 3. No existing background retry err := onDemand.SyncReferrers(ctx, "test-background-retry", "sha256:background", []string{"signature"}) So(err, ShouldNotBeNil) // Should get original network error // Wait for background goroutine to start and store request time.Sleep(50 * time.Millisecond) // Verify background retry request was stored reqBackground := request{ repo: "test-background-retry", reference: "sha256:background", serviceID: 0, isBackground: true, } _, existsBackground := onDemand.requestStore.Load(reqBackground) So(existsBackground, ShouldBeTrue) // Background retry request should exist // Wait for background retry to complete and cleanup (3x retry delay) time.Sleep(3 * retryDelay) // Verify background retry request was cleaned up after completion _, existsAfterCleanup := onDemand.requestStore.Load(reqBackground) So(existsAfterCleanup, ShouldBeFalse) // Should be cleaned up by defer function after retry }) // Test SyncImage background retry Convey("SyncImage background retry", func() { _, onDemand, retryDelay := createBackgroundRetryService("test-background-image-retry") ctx := context.Background() // Verify initial requestStore is empty initialCount := 0 onDemand.requestStore.Range(func(key, value any) bool { initialCount++ return true }) So(initialCount, ShouldEqual, 0) // Call SyncImage - should trigger background retry since: // 1. Network error (not a continue-path error) // 2. CanRetryOnError() returns true (maxRetries > 0) // 3. No existing background retry err := onDemand.SyncImage(ctx, "test-background-image-retry", "background-image-tag") So(err, ShouldNotBeNil) // Should get original network error // Wait for background goroutine to start and store request time.Sleep(50 * time.Millisecond) // Verify background retry request was stored reqBackground := request{ repo: "test-background-image-retry", reference: "background-image-tag", serviceID: 0, isBackground: true, } _, existsBackground := onDemand.requestStore.Load(reqBackground) So(existsBackground, ShouldBeTrue) // Background retry request should exist // Wait for background retry to complete and cleanup (3x retry delay) time.Sleep(3 * retryDelay) // Verify background retry request was cleaned up after completion _, existsAfterCleanup := onDemand.requestStore.Load(reqBackground) So(existsAfterCleanup, ShouldBeFalse) // Should be cleaned up by defer function after retry }) }) Convey("test assured channel waiting path", t, func() { // Strategy: Pre-populate requestStore with a channel to GUARANTEE the channel waiting code path // This ensures the "waiting on channel" message is logged and the channel receive happens Convey("SyncImage assured channel waiting", func() { onDemand := NewOnDemand(log.NewTestLogger()) // Create request and pre-populate with a channel that we control req := request{ repo: "test-guaranteed-channel-image", reference: "guaranteed-image-tag", serviceID: 0, isBackground: false, } // Create a channel that we control completely pendingChannel := make(chan error, 1) onDemand.requestStore.Store(req, pendingChannel) // Start request that will wait on our channel requestCompleted := make(chan error) go func() { err := onDemand.SyncImage(context.Background(), "test-guaranteed-channel-image", "guaranteed-image-tag") requestCompleted <- err }() // Wait a moment for the request to reach the channel waiting code time.Sleep(50 * time.Millisecond) // Send error through our controlled channel - this proves channel waiting worked pendingChannel <- errors.New("guaranteed channel error") // Verify the request got our controlled error err := <-requestCompleted So(err, ShouldNotBeNil) So(err.Error(), ShouldEqual, "guaranteed channel error") }) Convey("SyncReferrers assured channel waiting", func() { onDemand := NewOnDemand(log.NewTestLogger()) // Create request and pre-populate with a channel that we control req := request{ repo: "test-guaranteed-channel-referrers", reference: "sha256:guaranteed", serviceID: 0, isBackground: false, } // Create a channel that we control completely pendingChannel := make(chan error, 1) onDemand.requestStore.Store(req, pendingChannel) // Start request that will wait on our channel requestCompleted := make(chan error) go func() { err := onDemand.SyncReferrers(context.Background(), "test-guaranteed-channel-referrers", "sha256:guaranteed", []string{"signature"}) requestCompleted <- err }() // Wait a moment for the request to reach the channel waiting code time.Sleep(50 * time.Millisecond) // Send error through our controlled channel - this proves channel waiting worked pendingChannel <- errors.New("guaranteed referrer channel error") // Verify the request got our controlled error err := <-requestCompleted So(err, ShouldNotBeNil) So(err.Error(), ShouldEqual, "guaranteed referrer channel error") }) }) } func TestSyncLegacyCosignTagsSyncReferrers(t *testing.T) { Convey("SyncLegacyCosignTags=false skips getTags and the digest-tag loop in SyncReferrers", t, func() { getTagsCallCount := 0 syncLegacyFalse := false conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, SyncLegacyCosignTags: &syncLegacyFalse, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New() mockRemote := &mocks.SyncRemoteMock{ GetTagsFn: func(ctx context.Context, repo string) ([]string, error) { getTagsCallCount++ return nil, errors.New("getTags must not be called when SyncLegacyCosignTags=false") }, GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New(repo + "@" + tag) }, GetDigestFn: func(ctx context.Context, repo, tag string) (godigest.Digest, error) { return godigest.Digest("sha256:" + strings.Repeat("a", 64)), nil }, } service.remote = mockRemote mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New("local/" + repo + "@" + tag) }, CommitAllFn: func(repo string, imageReference ref.Ref) error { return nil }, } service.destination = mockDest ctx := context.Background() digest := "sha256:" + strings.Repeat("a", 64) err = service.SyncReferrers(ctx, "repo", digest, nil) // We expect an error from rc.ReferrerList since it's not connected to a registry, // but what we really care about is that getTags was NOT called. So(getTagsCallCount, ShouldEqual, 0) }) Convey("SyncLegacyCosignTags=true (default) calls getTags in SyncReferrers", t, func() { getTagsCallCount := 0 syncLegacyTrue := true conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, SyncLegacyCosignTags: &syncLegacyTrue, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New() mockRemote := &mocks.SyncRemoteMock{ GetTagsFn: func(ctx context.Context, repo string) ([]string, error) { getTagsCallCount++ return []string{"tag1"}, nil }, GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New(repo + "@" + tag) }, } service.remote = mockRemote mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New("local/" + repo + "@" + tag) }, } service.destination = mockDest ctx := context.Background() digest := "sha256:" + strings.Repeat("a", 64) err = service.SyncReferrers(ctx, "repo", digest, nil) // We don't care if it fails, only that getTags was called. So(getTagsCallCount, ShouldEqual, 1) }) } func TestOnDemandSyncReferrersNonRecursive(t *testing.T) { Convey("SyncReferrers (on-demand) uses non-recursive sync", t, func() { // Verify that recursive calls are not made. // We use a non-nil regclient to avoid panics. syncLegacyFalse := false conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, SyncLegacyCosignTags: &syncLegacyFalse, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New() mockRemote := &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New(repo + "@" + tag) }, GetDigestFn: func(ctx context.Context, repo, tag string) (godigest.Digest, error) { return godigest.Digest("sha256:" + strings.Repeat("a", 64)), nil }, GetTagsFn: func(ctx context.Context, repo string) ([]string, error) { return []string{}, nil }, } service.remote = mockRemote mockDest := &mocks.SyncDestinationMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.New("local/" + repo + "@" + tag) }, } service.destination = mockDest ctx := context.Background() digest := "sha256:" + strings.Repeat("a", 64) err = service.SyncReferrers(ctx, "repo", digest, nil) // It will fail because rc is not connected, but it shouldn't panic. So(err, ShouldNotBeNil) }) } func TestDestinationRegistry(t *testing.T) { Convey("make StoreController", t, func() { dir := t.TempDir() log := log.NewTestLogger() metrics := monitoring.NewMetricsServer(false, log) defer metrics.Stop() // Clean up metrics server to prevent resource leaks cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ RootDir: dir, Name: "cache", UseRelPaths: true, }, log) syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver, nil, nil) repoName := "repo" storeController := storage.StoreController{DefaultStore: syncImgStore} registry := NewDestinationRegistry(storeController, storeController, nil, log) imageReference, err := registry.GetImageReference(repoName, "1.0") So(err, ShouldBeNil) So(imageReference, ShouldNotBeNil) imgStore := getImageStoreFromImageReference(repoName, imageReference, log) // create a blob/layer upload, err := imgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) content := []byte("this is a blob1") buf := bytes.NewBuffer(content) buflen := buf.Len() digest := godigest.FromBytes(content) So(digest, ShouldNotBeNil) blob, err := imgStore.PutBlobChunkStreamed(repoName, upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) bdgst1 := digest bsize1 := len(content) err = imgStore.FinishBlobUpload(repoName, upload, buf, digest) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) // push index image var index ispec.Index index.SchemaVersion = 2 index.MediaType = ispec.MediaTypeImageIndex for i := 0; i < 4; i++ { // upload image config blob upload, err := imgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) cblob, cdigest := GetRandomImageConfig() buf := bytes.NewBuffer(cblob) buflen := buf.Len() blob, err := imgStore.PutBlobChunkStreamed(repoName, upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) err = imgStore.FinishBlobUpload(repoName, upload, buf, cdigest) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) // create a manifest manifest := ispec.Manifest{ Config: ispec.Descriptor{ MediaType: ispec.MediaTypeImageConfig, Digest: cdigest, Size: int64(len(cblob)), }, Layers: []ispec.Descriptor{ { MediaType: ispec.MediaTypeImageLayer, Digest: bdgst1, Size: int64(bsize1), }, }, } manifest.SchemaVersion = 2 content, err = json.Marshal(manifest) So(err, ShouldBeNil) digest = godigest.FromBytes(content) So(digest, ShouldNotBeNil) _, _, err = imgStore.PutImageManifest(repoName, digest.String(), ispec.MediaTypeImageManifest, content, nil) So(err, ShouldBeNil) index.Manifests = append(index.Manifests, ispec.Descriptor{ Digest: digest, MediaType: ispec.MediaTypeImageManifest, Size: int64(len(content)), }) } // upload index image indexContent, err := json.Marshal(index) So(err, ShouldBeNil) indexDigest := godigest.FromBytes(indexContent) So(indexDigest, ShouldNotBeNil) _, _, err = imgStore.PutImageManifest(repoName, "1.0", ispec.MediaTypeImageIndex, indexContent, nil) So(err, ShouldBeNil) Convey("sync index image", func() { ok, err := registry.CanSkipImage(repoName, "1.0", indexDigest) So(ok, ShouldBeFalse) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldBeNil) }) Convey("CleanupImage()", func() { ok, err := registry.CanSkipImage(repoName, "1.0", indexDigest) So(ok, ShouldBeFalse) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldBeNil) err = registry.CleanupImage(imageReference, repoName) So(err, ShouldBeNil) }) Convey("trigger GetImageManifest error in CommitImage()", func() { err = os.Chmod(imgStore.BlobPath(repoName, indexDigest), 0o000) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger linter error in CommitImage()", func() { defaultVal := true linter := lint.NewLinter(&config.LintConfig{ BaseConfig: config.BaseConfig{ Enable: &defaultVal, }, MandatoryAnnotations: []string{"annot1"}, }, log) syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver, nil, nil) repoName := "repo" storeController := storage.StoreController{DefaultStore: syncImgStore} registry := NewDestinationRegistry(storeController, storeController, nil, log) err = registry.CommitAll(repoName, imageReference) So(err, ShouldBeNil) }) Convey("trigger GetBlobContent on manifest error in CommitImage()", func() { err = os.Chmod(imgStore.BlobPath(repoName, digest), 0o000) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger copyBlob() error in CommitImage()", func() { err = os.Chmod(imgStore.BlobPath(repoName, bdgst1), 0o000) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger PutImageManifest error on index manifest in CommitImage()", func() { err = os.MkdirAll(syncImgStore.BlobPath(repoName, indexDigest), storageConstants.DefaultDirPerms) So(err, ShouldBeNil) err = os.Chmod(syncImgStore.BlobPath(repoName, indexDigest), 0o000) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger metaDB error on index manifest in CommitImage()", func() { storeController := storage.StoreController{DefaultStore: syncImgStore} registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{ SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error { if reference == "1.0" { return zerr.ErrRepoMetaNotFound } return nil }, }, log) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger metaDB error on image manifest in CommitImage()", func() { storeController := storage.StoreController{DefaultStore: syncImgStore} registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{ SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error { return zerr.ErrRepoMetaNotFound }, }, log) err = registry.CommitAll(repoName, imageReference) So(err, ShouldNotBeNil) }) Convey("trigger GetBlobContent error on manifest within image index in copyManifest()", func() { // This test specifically targets the error where GetBlobContent fails for a manifest // that is part of an image index. // Create a destination registry using the existing syncImgStore as temp storage storeController := storage.StoreController{DefaultStore: syncImgStore} registry := NewDestinationRegistry(storeController, storeController, nil, log) // Get an image reference - this will create a temp session directory imageReference, err := registry.GetImageReference(repoName, "test-index") So(err, ShouldBeNil) // Get the temp image store from the image reference tempImgStore := getImageStoreFromImageReference(repoName, imageReference, log) // Create an image index with multiple manifests var index ispec.Index index.SchemaVersion = 2 index.MediaType = ispec.MediaTypeImageIndex // Create child manifests for i := 0; i < 2; i++ { // Create blob content content := []byte(fmt.Sprintf("this is blob %d", i)) buf := bytes.NewBuffer(content) buflen := buf.Len() digest := godigest.FromBytes(content) So(digest, ShouldNotBeNil) // Upload blob upload, err := tempImgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) blob, err := tempImgStore.PutBlobChunkStreamed(repoName, upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) err = tempImgStore.FinishBlobUpload(repoName, upload, buf, digest) So(err, ShouldBeNil) // Create config blob cblob := []byte(fmt.Sprintf(`{"architecture":"amd64","os":"linux","config":{"User":"test%d"}}`, i)) cdigest := godigest.FromBytes(cblob) So(cdigest, ShouldNotBeNil) upload, err = tempImgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) cbuf := bytes.NewBuffer(cblob) blob, err = tempImgStore.PutBlobChunkStreamed(repoName, upload, cbuf) So(err, ShouldBeNil) So(blob, ShouldEqual, len(cblob)) err = tempImgStore.FinishBlobUpload(repoName, upload, cbuf, cdigest) So(err, ShouldBeNil) // Create a manifest manifest := ispec.Manifest{ Config: ispec.Descriptor{ MediaType: ispec.MediaTypeImageConfig, Digest: cdigest, Size: int64(len(cblob)), }, Layers: []ispec.Descriptor{ { MediaType: ispec.MediaTypeImageLayer, Digest: digest, Size: int64(buflen), }, }, } manifest.SchemaVersion = 2 manifestContent, err := json.Marshal(manifest) So(err, ShouldBeNil) manifestDigest := godigest.FromBytes(manifestContent) So(manifestDigest, ShouldNotBeNil) // Store the manifest in the temp image store _, _, err = tempImgStore.PutImageManifest(repoName, manifestDigest.String(), ispec.MediaTypeImageManifest, manifestContent, nil) So(err, ShouldBeNil) // Add to index index.Manifests = append(index.Manifests, ispec.Descriptor{ Digest: manifestDigest, MediaType: ispec.MediaTypeImageManifest, Size: int64(len(manifestContent)), }) } // Create the index manifest indexContent, err := json.Marshal(index) So(err, ShouldBeNil) indexDigest := godigest.FromBytes(indexContent) So(indexDigest, ShouldNotBeNil) // Store the index manifest in the temp image store _, _, err = tempImgStore.PutImageManifest(repoName, indexDigest.String(), ispec.MediaTypeImageIndex, indexContent, nil) So(err, ShouldBeNil) // Now remove one of the child manifest blobs to trigger the error childManifestDigest := index.Manifests[1].Digest err = os.Remove(tempImgStore.BlobPath(repoName, childManifestDigest)) So(err, ShouldBeNil) // Create a descriptor for the index manifest desc := ispec.Descriptor{ Digest: indexDigest, MediaType: ispec.MediaTypeImageIndex, Size: int64(len(indexContent)), } // Initialize the seen slice seen := &[]godigest.Digest{} // Call copyManifest directly with the index manifest - this should trigger the error path at lines 234-239 // when it tries to get blob content for the child manifest with the removed blob err = registry.(*DestinationRegistry).copyManifest(repoName, desc, indexDigest.String(), tempImgStore, seen) // Verify the error is returned and contains the expected message So(err, ShouldNotBeNil) So(err.Error(), ShouldContainSubstring, "blob not found") }) Convey("push image", func() { imageReference, err := registry.GetImageReference(repoName, "2.0") So(err, ShouldBeNil) So(imageReference, ShouldNotBeNil) imgStore := getImageStoreFromImageReference(repoName, imageReference, log) // upload image // create a blob/layer upload, err := imgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) content := []byte("this is a blob1") buf := bytes.NewBuffer(content) buflen := buf.Len() digest := godigest.FromBytes(content) So(digest, ShouldNotBeNil) blob, err := imgStore.PutBlobChunkStreamed(repoName, upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) bdgst1 := digest bsize1 := len(content) err = imgStore.FinishBlobUpload(repoName, upload, buf, digest) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) // upload image config blob upload, err = imgStore.NewBlobUpload(repoName) So(err, ShouldBeNil) So(upload, ShouldNotBeEmpty) cblob, cdigest := GetRandomImageConfig() buf = bytes.NewBuffer(cblob) buflen = buf.Len() blob, err = imgStore.PutBlobChunkStreamed(repoName, upload, buf) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) err = imgStore.FinishBlobUpload(repoName, upload, buf, cdigest) So(err, ShouldBeNil) So(blob, ShouldEqual, buflen) // create a manifest manifest := ispec.Manifest{ Config: ispec.Descriptor{ MediaType: ispec.MediaTypeImageConfig, Digest: cdigest, Size: int64(len(cblob)), }, Layers: []ispec.Descriptor{ { MediaType: ispec.MediaTypeImageLayer, Digest: bdgst1, Size: int64(bsize1), }, }, } manifest.SchemaVersion = 2 content, err = json.Marshal(manifest) So(err, ShouldBeNil) digest = godigest.FromBytes(content) So(digest, ShouldNotBeNil) _, _, err = imgStore.PutImageManifest(repoName, "2.0", ispec.MediaTypeImageManifest, content, nil) So(err, ShouldBeNil) Convey("sync image", func() { ok, err := registry.CanSkipImage(repoName, "2.0", digest) So(ok, ShouldBeFalse) So(err, ShouldBeNil) err = registry.CommitAll(repoName, imageReference) So(err, ShouldBeNil) }) }) Convey("CommitAll with non-existent directory", func() { // Create a registry and get an image reference registry := NewDestinationRegistry(storeController, storeController, nil, log) imageReference, err := registry.GetImageReference("nonexistent-repo", "1.0") So(err, ShouldBeNil) // Remove the directory to simulate it not existing tempImageStore := getImageStoreFromImageReference("nonexistent-repo", imageReference, log) repoDir := path.Join(tempImageStore.RootDir(), "nonexistent-repo") err = os.RemoveAll(repoDir) So(err, ShouldBeNil) // CommitAll should return nil when directory doesn't exist (image was skipped) err = registry.CommitAll("nonexistent-repo", imageReference) So(err, ShouldBeNil) }) Convey("CommitAll with empty directory", func() { // Create a registry and get an image reference registry := NewDestinationRegistry(storeController, storeController, nil, log) imageReference, err := registry.GetImageReference("empty-repo", "1.0") So(err, ShouldBeNil) // Create an empty directory (no index.json, no blobs) tempImageStore := getImageStoreFromImageReference("empty-repo", imageReference, log) repoDir := path.Join(tempImageStore.RootDir(), "empty-repo") err = os.MkdirAll(repoDir, 0o755) So(err, ShouldBeNil) // CommitAll should return nil when directory is empty (image was skipped) err = registry.CommitAll("empty-repo", imageReference) So(err, ShouldBeNil) }) Convey("CommitAll with directory containing files but no index.json", func() { // Create a registry and get an image reference registry := NewDestinationRegistry(storeController, storeController, nil, log) imageReference, err := registry.GetImageReference("inconsistent-repo", "1.0") So(err, ShouldBeNil) // Create a directory with some files but no index.json (inconsistent state) tempImageStore := getImageStoreFromImageReference("inconsistent-repo", imageReference, log) repoDir := path.Join(tempImageStore.RootDir(), "inconsistent-repo") err = os.MkdirAll(repoDir, 0o755) So(err, ShouldBeNil) // Create a dummy file to make directory non-empty dummyFile := path.Join(repoDir, "dummy.txt") err = os.WriteFile(dummyFile, []byte("dummy"), 0o644) So(err, ShouldBeNil) // CommitAll should return an error when directory is not empty but index.json is missing err = registry.CommitAll("inconsistent-repo", imageReference) So(err, ShouldNotBeNil) So(errors.Is(err, zerr.ErrRepoNotFound), ShouldBeTrue) }) Convey("CommitAll with ReadDir error (non-ErrNotExist)", func() { // Create a registry and get an image reference registry := NewDestinationRegistry(storeController, storeController, nil, log) imageReference, err := registry.GetImageReference("error-repo", "1.0") So(err, ShouldBeNil) // Get the repo directory path tempImageStore := getImageStoreFromImageReference("error-repo", imageReference, log) repoDir := path.Join(tempImageStore.RootDir(), "error-repo") // Create a file at the repoDir path instead of a directory // This will cause os.ReadDir to fail with an error that is NOT os.ErrNotExist err = os.MkdirAll(path.Dir(repoDir), 0o755) So(err, ShouldBeNil) err = os.WriteFile(repoDir, []byte("not a directory"), 0o644) So(err, ShouldBeNil) // CommitAll should return the ReadDir error (not ErrNotExist) err = registry.CommitAll("error-repo", imageReference) So(err, ShouldNotBeNil) So(errors.Is(err, os.ErrNotExist), ShouldBeFalse) }) }) } type mockStreamManager struct { streamingImageManifestFn func(repo, reference string) (rcManifest.Manifest, bool) storeImageForStreamingFn func(repo, reference string, m rcManifest.Manifest) error removeStreamingImageFn func(repo, reference string) } func (m *mockStreamManager) StreamingImageManifest(repo, reference string) (rcManifest.Manifest, bool) { if m.streamingImageManifestFn != nil { return m.streamingImageManifestFn(repo, reference) } return nil, false } func (m *mockStreamManager) StoreImageForStreaming(repo, reference string, man rcManifest.Manifest) error { if m.storeImageForStreamingFn != nil { return m.storeImageForStreamingFn(repo, reference, man) } return nil } func (m *mockStreamManager) RemoveStreamingImage(repo, reference string) { if m.removeStreamingImageFn != nil { m.removeStreamingImageFn(repo, reference) } } func (m *mockStreamManager) ConnectClient(_ string, _ io.Writer) (*InFlightBlobCopier, error) { return nil, nil } func (m *mockStreamManager) StreamingBlobReader(reader *rcBlob.BReader) (*rcBlob.BReader, error) { return reader, nil } func (m *mockStreamManager) CachedBlobInfo(_ string) (int64, string, error) { return 0, "", nil } type mockSyncService struct { fetchManifestFn func(ctx context.Context, repo, reference string) (rcManifest.Manifest, error) isStreamingForRepoFn func(repo string) bool getSyncTimeoutFn func() time.Duration syncImageFn func(ctx context.Context, repo, reference string) error canRetryOnErrorFn func() bool } func (s *mockSyncService) FetchManifest(ctx context.Context, repo, reference string) (rcManifest.Manifest, error) { if s.fetchManifestFn != nil { return s.fetchManifestFn(ctx, repo, reference) } return nil, zerr.ErrManifestNotFound } func (s *mockSyncService) IsStreamingForRepo(repo string) bool { if s.isStreamingForRepoFn != nil { return s.isStreamingForRepoFn(repo) } return false } func (s *mockSyncService) GetSyncTimeout() time.Duration { if s.getSyncTimeoutFn != nil { return s.getSyncTimeoutFn() } return 30 * time.Second } func (s *mockSyncService) SyncImage(ctx context.Context, repo, reference string) error { if s.syncImageFn != nil { return s.syncImageFn(ctx, repo, reference) } return nil } func (s *mockSyncService) CanRetryOnError() bool { if s.canRetryOnErrorFn != nil { return s.canRetryOnErrorFn() } return false } func (s *mockSyncService) GetNextRepo(_ string) (string, error) { return "", nil } func (s *mockSyncService) SyncRepo(_ context.Context, _ string) error { return nil } func (s *mockSyncService) SyncReferrers(_ context.Context, _, _ string, _ []string) error { return nil } func (s *mockSyncService) ResetCatalog() {} func newTestManifest(t *testing.T) rcManifest.Manifest { t.Helper() origMan := rcOCIV1.Manifest{ Versioned: rcOCIV1.ManifestSchemaVersion, } m, err := rcManifest.New(rcManifest.WithOrig(origMan)) if err != nil { t.Fatalf("failed to create test manifest: %v", err) } return m } func TestOnDemandSetAndGetStreamManager(t *testing.T) { Convey("StreamManager is nil before SetStreamManager is called", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) So(onDemand.StreamManager(), ShouldBeNil) }) Convey("SetStreamManager stores the manager and StreamManager returns it", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) sm := &mockStreamManager{} onDemand.SetStreamManager(sm) So(onDemand.StreamManager(), ShouldEqual, sm) }) Convey("SetStreamManager can be called multiple times and last value wins", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) sm1 := &mockStreamManager{} sm2 := &mockStreamManager{} onDemand.SetStreamManager(sm1) onDemand.SetStreamManager(sm2) So(onDemand.StreamManager(), ShouldEqual, sm2) }) } func TestOnDemandFetchManifestForStream(t *testing.T) { Convey("returns cached manifest when StreamingImageManifest has image in cache", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) cached := newTestManifest(t) sm := &mockStreamManager{ streamingImageManifestFn: func(repo, reference string) (rcManifest.Manifest, bool) { So(repo, ShouldEqual, "myrepo") So(reference, ShouldEqual, "v1.0") return cached, true }, } onDemand.SetStreamManager(sm) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") So(err, ShouldBeNil) So(result, ShouldEqual, cached) }) Convey("returns ErrBlobNotFound when no services are registered", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) sm := &mockStreamManager{ streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { return nil, false }, } onDemand.SetStreamManager(sm) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue) So(result, ShouldBeNil) }) Convey("returns ErrBlobNotFound when all services fail to fetch the manifest", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) sm := &mockStreamManager{ streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { return nil, false }, } onDemand.SetStreamManager(sm) svc := &mockSyncService{ fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { return nil, errors.New("upstream unavailable") }, getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, } onDemand.Add(svc) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") So(errors.Is(err, zerr.ErrBlobNotFound), ShouldBeTrue) So(result, ShouldBeNil) }) Convey("fetches manifest from service, stores it for streaming and triggers background sync", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) fetched := newTestManifest(t) var storeRepo, storeRef string var storedManifest rcManifest.Manifest sm := &mockStreamManager{ streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { return nil, false }, storeImageForStreamingFn: func(repo, reference string, m rcManifest.Manifest) error { storeRepo = repo storeRef = reference storedManifest = m return nil }, } onDemand.SetStreamManager(sm) syncCalled := make(chan struct{}) svc := &mockSyncService{ fetchManifestFn: func(_ context.Context, repo, reference string) (rcManifest.Manifest, error) { return fetched, nil }, getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, syncImageFn: func(_ context.Context, _, _ string) error { close(syncCalled) return nil }, } onDemand.Add(svc) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "latest") So(err, ShouldBeNil) So(result, ShouldEqual, fetched) So(storeRepo, ShouldEqual, "myrepo") So(storeRef, ShouldEqual, "latest") So(storedManifest, ShouldEqual, fetched) select { case <-syncCalled: // background sync was triggered as expected case <-time.After(2 * time.Second): So("background sync goroutine was not triggered", ShouldBeEmpty) } }) Convey("stops at the first successful service and ignores later services", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) fetched := newTestManifest(t) sm := &mockStreamManager{ streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { return nil, false }, } onDemand.SetStreamManager(sm) secondFetchCalled := false svc1 := &mockSyncService{ fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { return fetched, nil }, getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, } svc2 := &mockSyncService{ fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { secondFetchCalled = true return nil, errors.New("should not be reached") }, getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, } onDemand.Add(svc1) onDemand.Add(svc2) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v2.0") So(err, ShouldBeNil) So(result, ShouldEqual, fetched) So(secondFetchCalled, ShouldBeFalse) }) Convey("returns error when StoreImageForStreaming fails", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) fetched := newTestManifest(t) storeErr := errors.New("disk full") sm := &mockStreamManager{ streamingImageManifestFn: func(_, _ string) (rcManifest.Manifest, bool) { return nil, false }, storeImageForStreamingFn: func(_, _ string, _ rcManifest.Manifest) error { return storeErr }, } onDemand.SetStreamManager(sm) svc := &mockSyncService{ fetchManifestFn: func(_ context.Context, _, _ string) (rcManifest.Manifest, error) { return fetched, nil }, getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, } onDemand.Add(svc) result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "v1.0") So(errors.Is(err, storeErr), ShouldBeTrue) So(result, ShouldBeNil) }) } func TestNewClientTimeoutBehavior(t *testing.T) { Convey("Test newClient timeout behavior", t, func() { logger := log.NewTestLogger() zeroRetries := 0 retryDelay := 1 * time.Millisecond Convey("Client respects ResponseHeaderTimeout from config", func() { // Server delay - must be longer than ResponseHeaderTimeout to trigger timeout serverDelay := 1 * time.Second // Create a test server that accepts connections but delays sending headers server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Sleep longer than the timeout to simulate a server that connects // but doesn't send headers quickly time.Sleep(serverDelay) w.WriteHeader(http.StatusOK) })) defer server.Close() host := strings.TrimPrefix(server.URL, "http://") // Create sync config with short ResponseHeaderTimeout opts := syncconf.RegistryConfig{ URLs: []string{server.URL}, SyncTimeout: syncConstants.DefaultSyncTimeout, ResponseHeaderTimeout: 10 * time.Millisecond, MaxRetries: &zeroRetries, RetryDelay: &retryDelay, } // Create the client using the actual production function client, _, err := newClient(opts, syncconf.CredentialsFile{}, logger) So(err, ShouldBeNil) // Create a reference to the test server r, err := ref.New(host + "/repo:tag") So(err, ShouldBeNil) // Make a request - it should timeout quickly due to ResponseHeaderTimeout start := time.Now() // Use ManifestHead as a lightweight operation to trigger the HTTP request _, err = client.ManifestHead(context.Background(), r) elapsed := time.Since(start) // Should timeout quickly (approx 10ms * retries + overhead) // With 1ms retry delay and 10ms timeout, retries add overhead but should still be < serverDelay So(err, ShouldNotBeNil) // Allow small tolerance for timing variability (5ms) So(elapsed, ShouldBeGreaterThanOrEqualTo, opts.ResponseHeaderTimeout-5*time.Millisecond) So(elapsed, ShouldBeLessThan, serverDelay) }) Convey("Client respects SyncTimeout (overall) from config", func() { // Server delay - must be longer than SyncTimeout to trigger timeout serverDelay := 1 * time.Second // Create a test server that sends headers quickly but delays body transfer server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Send headers immediately with Content-Length to bypass ResponseHeaderTimeout // but indicate there's a body coming w.Header().Set("Content-Length", "100") w.WriteHeader(http.StatusOK) // Flush headers to ensure they're sent if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } // Delay sending body data - this will trigger SyncTimeout (overall client timeout) time.Sleep(serverDelay) // Write body data (client should timeout before this completes) w.Write(make([]byte, 100)) })) defer server.Close() host := strings.TrimPrefix(server.URL, "http://") // Create sync config with short SyncTimeout opts := syncconf.RegistryConfig{ URLs: []string{server.URL}, SyncTimeout: 10 * time.Millisecond, ResponseHeaderTimeout: syncConstants.DefaultResponseHeaderTimeout, MaxRetries: &zeroRetries, RetryDelay: &retryDelay, } // Create the client using the actual production function client, _, err := newClient(opts, syncconf.CredentialsFile{}, logger) So(err, ShouldBeNil) r, err := ref.New(host + "/repo:tag") So(err, ShouldBeNil) // Make a request - it should timeout due to overall SyncTimeout // Use ManifestGet (not ManifestHead) to test body transfer timeout start := time.Now() _, err = client.ManifestGet(context.Background(), r) elapsed := time.Since(start) // Should timeout within the SyncTimeout period (+ retries) // Elapsed should be >= SyncTimeout (since that's when it times out) and < serverDelay So(err, ShouldNotBeNil) // Allow small tolerance for timing variability (5ms) So(elapsed, ShouldBeGreaterThanOrEqualTo, opts.SyncTimeout-5*time.Millisecond) So(elapsed, ShouldBeLessThan, serverDelay) }) }) } func TestBaseServiceIsStreamingForRepo(t *testing.T) { streamEnabled := true streamDisabled := false Convey("returns false when Stream is not configured", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse) }) Convey("returns false when Stream is explicitly disabled", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamDisabled, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) So(service.IsStreamingForRepo("anyrepo"), ShouldBeFalse) }) Convey("returns true when streaming enabled and no content filter configured", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, // Content is empty — all repos allowed } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) So(service.IsStreamingForRepo("anyrepo"), ShouldBeTrue) So(service.IsStreamingForRepo("other/repo"), ShouldBeTrue) }) Convey("returns true when streaming enabled and repo matches content filter", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, Content: []syncconf.Content{{Prefix: "myrepo"}}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) So(service.IsStreamingForRepo("myrepo"), ShouldBeTrue) }) Convey("returns false when streaming enabled but repo does not match content filter", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, Content: []syncconf.Content{{Prefix: "allowed-prefix"}}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) So(service.IsStreamingForRepo("unrelated-repo"), ShouldBeFalse) }) } func TestBaseServiceFetchManifestWithStreaming(t *testing.T) { streamEnabled := true Convey("returns ErrSyncImageFilteredOut when content filter rejects the repo", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, Content: []syncconf.Content{{Prefix: "upstream-prefix"}}, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) m, err := service.FetchManifest(context.Background(), "myrepo", "latest") So(errors.Is(err, zerr.ErrSyncImageFilteredOut), ShouldBeTrue) So(m, ShouldBeNil) }) Convey("propagates GetImageReference error when streaming is enabled", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) getRefErr := errors.New("ref build failed") service.remote = &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { return ref.Ref{}, getRefErr }, } m, err := service.FetchManifest(context.Background(), "myrepo", "latest") So(errors.Is(err, getRefErr), ShouldBeTrue) So(m, ShouldBeNil) }) Convey("propagates ManifestGet error when GetImageReference succeeds but registry is unreachable", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, Stream: &streamEnabled, // No content filter } service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.remote = &mocks.SyncRemoteMock{ GetImageReferenceFn: func(repo, tag string) (ref.Ref, error) { // Return a syntactically valid ref pointing to a non-existent host. return ref.New("localhost:0/" + repo + ":" + tag) }, GetHostNameFn: func() string { return "localhost:0" }, } m, err := service.FetchManifest(context.Background(), "myrepo", "latest") So(err, ShouldNotBeNil) So(m, ShouldBeNil) }) }