Files
zot/pkg/extensions/events/events_test.go
T
Andrei Aaron da426850e7 chore: update golangci-lint and fix all issues (#3575)
* chore: Update golangci-lint

Signed-off-by: Lars Francke <git@lars-francke.de>

* chore: fix all golangci-lint issues

- Remove deprecated `// +build` tags
- Fix godoclint, modernize, wsl_v5, govet, lll, gci, noctx issues
- Update linter configuration
- Modernize code to use Go 1.22+ features (for range N, slices.Contains, etc.)
- Update make check lint the privileged tests

Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>

---------

Signed-off-by: Lars Francke <git@lars-francke.de>
Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
Co-authored-by: Lars Francke <git@lars-francke.de>
2025-11-22 23:36:48 +02:00

350 lines
8.7 KiB
Go

//go:build events
package events_test
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
. "github.com/smartystreets/goconvey/convey"
"k8s.io/apimachinery/pkg/util/rand"
zerr "zotregistry.dev/zot/v2/errors"
eventsconf "zotregistry.dev/zot/v2/pkg/extensions/config/events"
"zotregistry.dev/zot/v2/pkg/extensions/events"
"zotregistry.dev/zot/v2/pkg/log"
)
type mockSink struct {
store chan *cloudevents.Event
}
func (s *mockSink) Emit(e *cloudevents.Event) cloudevents.Result {
s.store <- e
return nil
}
func (s *mockSink) Close() error {
return nil
}
var _ events.Sink = (*mockSink)(nil)
func newMockSink() *mockSink {
return &mockSink{
store: make(chan *cloudevents.Event),
}
}
func TestEventSinkMissing(t *testing.T) {
Convey("missing sink", t, func() {
_, err := events.NewRecorder(log.NewTestLogger())
So(err, ShouldNotBeNil)
So(err, ShouldEqual, zerr.ErrEventSinkIsNil)
})
}
func TestEvents(t *testing.T) {
Convey("emits events", t, func() {
sink := newMockSink()
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
So(err, ShouldBeNil)
Convey("repository created", func() {
recorder.RepositoryCreated("test")
ev := <-sink.store
So(ev.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
})
Convey("image updated", func() {
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
ev := <-sink.store
So(ev.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
})
Convey("image deleted", func() {
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
ev := <-sink.store
So(ev.Type(), ShouldEqual, events.ImageDeletedEventType.String())
})
Convey("image lint failed", func() {
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
ev := <-sink.store
So(ev.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
})
})
}
func TestHTTPSinkEvents(t *testing.T) {
Convey("emits events to http sink", t, func() {
eventChan := make(chan *cloudevents.Event, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
event, err := cehttp.NewEventFromHTTPRequest(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
eventChan <- event
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
config := eventsconf.SinkConfig{
Type: eventsconf.HTTP,
Address: server.URL,
Timeout: 5 * time.Second,
}
sink, err := events.NewHTTPSink(config)
So(err, ShouldBeNil)
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
So(err, ShouldBeNil)
Convey("repository created", func() {
recorder.RepositoryCreated("test")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
})
Convey("image updated", func() {
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
})
Convey("image deleted", func() {
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
})
Convey("image lint failed", func() {
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
})
})
}
func TestNATSSinkEvents(t *testing.T) {
Convey("emits events to nats sink", t, func() {
Convey("repository created", func() {
natsServer, natsURL := setupTestNATSServer(t)
defer natsServer.Shutdown()
testChannel := "test-events-" + randomString()
recorder, err := createRecorder(t, natsURL, testChannel)
defer recorder.Close()
So(err, ShouldBeNil)
eventChan := make(chan *cloudevents.Event, 1)
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
defer nc.Close()
So(err, ShouldBeNil)
recorder.RepositoryCreated("test")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
})
Convey("image updated", func() {
natsServer, natsURL := setupTestNATSServer(t)
defer natsServer.Shutdown()
testChannel := "test-events-" + randomString()
recorder, err := createRecorder(t, natsURL, testChannel)
So(err, ShouldBeNil)
defer recorder.Close()
eventChan := make(chan *cloudevents.Event, 1)
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
defer nc.Close()
So(err, ShouldBeNil)
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
})
Convey("image deleted", func() {
natsServer, natsURL := setupTestNATSServer(t)
defer natsServer.Shutdown()
testChannel := "test-events-" + randomString()
eventChan := make(chan *cloudevents.Event, 1)
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
defer nc.Close()
So(err, ShouldBeNil)
recorder, err := createRecorder(t, natsURL, testChannel)
defer recorder.Close()
So(err, ShouldBeNil)
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
})
Convey("image lint failed", func() {
natsServer, natsURL := setupTestNATSServer(t)
defer natsServer.Shutdown()
testChannel := "test-events-" + randomString()
recorder, err := createRecorder(t, natsURL, testChannel)
defer recorder.Close()
So(err, ShouldBeNil)
eventChan := make(chan *cloudevents.Event, 1)
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
defer nc.Close()
So(err, ShouldBeNil)
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
e := getEvent(t, eventChan)
So(e, ShouldNotBeNil)
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
})
})
}
func setupTestNATSServer(t *testing.T) (*server.Server, string) {
t.Helper()
opts := server.Options{
Host: "127.0.0.1",
Port: -1, // Use random available port
NoLog: true,
NoSigs: true,
MaxControlLine: 4096,
}
natsServer, err := server.NewServer(&opts)
if err != nil {
panic(err)
}
go natsServer.Start()
if !natsServer.ReadyForConnections(5 * time.Second) {
panic("NATS server failed to start")
}
return natsServer, natsServer.ClientURL()
}
func createRecorder(t *testing.T, natsURL, testChannel string) (events.Recorder, error) {
t.Helper()
config := eventsconf.SinkConfig{
Type: eventsconf.NATS,
Address: natsURL,
Channel: testChannel,
Timeout: 15 * time.Second,
}
sink, err := events.NewNATSSink(config)
if err != nil {
return nil, err
}
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
if err != nil {
return nil, err
}
return recorder, nil
}
func createSubscription(t *testing.T, natsURL, channelName string, bus chan *cloudevents.Event) (*nats.Conn, error) {
t.Helper()
natsConnection, err := nats.Connect(natsURL)
if err != nil {
return nil, err
}
_, err = natsConnection.Subscribe(channelName, func(msg *nats.Msg) {
event := cloudevents.NewEvent()
headers := msg.Header
event.SetID(headers.Get("ce-id"))
event.SetSource(headers.Get("ce-source"))
event.SetType(headers.Get("ce-type"))
if subj := headers.Get("ce-subject"); subj != "" {
event.SetSubject(subj)
}
if err := event.UnmarshalJSON(msg.Data); err == nil {
bus <- &event
}
_ = msg.Respond([]byte("OK"))
})
if err != nil {
return nil, err
}
err = natsConnection.FlushTimeout(2 * time.Second)
if err != nil {
return nil, fmt.Errorf("flush failed: %w", err)
}
return natsConnection, nil
}
func getEvent(t *testing.T, c chan *cloudevents.Event) *cloudevents.Event {
t.Helper()
var evt *cloudevents.Event
select {
case evt = <-c:
case <-time.After(time.Second * 2):
t.Fatal("timed out waiting for event")
}
return evt
}
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func randomString() string {
rand.Seed(time.Now().UnixNano())
buf := make([]byte, 5)
for i := range buf {
buf[i] = charset[rand.Intn(len(charset))]
}
return string(buf)
}