mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 05:28:07 +08:00
da426850e7
* chore: Update golangci-lint Signed-off-by: Lars Francke <git@lars-francke.de> * chore: fix all golangci-lint issues - Remove deprecated `// +build` tags - Fix godoclint, modernize, wsl_v5, govet, lll, gci, noctx issues - Update linter configuration - Modernize code to use Go 1.22+ features (for range N, slices.Contains, etc.) - Update make check lint the privileged tests Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> --------- Signed-off-by: Lars Francke <git@lars-francke.de> Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> Co-authored-by: Lars Francke <git@lars-francke.de>
350 lines
8.7 KiB
Go
350 lines
8.7 KiB
Go
//go:build events
|
|
|
|
package events_test
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
|
|
"github.com/google/go-containerregistry/pkg/v1/types"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
. "github.com/smartystreets/goconvey/convey"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
|
|
zerr "zotregistry.dev/zot/v2/errors"
|
|
eventsconf "zotregistry.dev/zot/v2/pkg/extensions/config/events"
|
|
"zotregistry.dev/zot/v2/pkg/extensions/events"
|
|
"zotregistry.dev/zot/v2/pkg/log"
|
|
)
|
|
|
|
type mockSink struct {
|
|
store chan *cloudevents.Event
|
|
}
|
|
|
|
func (s *mockSink) Emit(e *cloudevents.Event) cloudevents.Result {
|
|
s.store <- e
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *mockSink) Close() error {
|
|
return nil
|
|
}
|
|
|
|
var _ events.Sink = (*mockSink)(nil)
|
|
|
|
func newMockSink() *mockSink {
|
|
return &mockSink{
|
|
store: make(chan *cloudevents.Event),
|
|
}
|
|
}
|
|
|
|
func TestEventSinkMissing(t *testing.T) {
|
|
Convey("missing sink", t, func() {
|
|
_, err := events.NewRecorder(log.NewTestLogger())
|
|
So(err, ShouldNotBeNil)
|
|
So(err, ShouldEqual, zerr.ErrEventSinkIsNil)
|
|
})
|
|
}
|
|
|
|
func TestEvents(t *testing.T) {
|
|
Convey("emits events", t, func() {
|
|
sink := newMockSink()
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
So(err, ShouldBeNil)
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestHTTPSinkEvents(t *testing.T) {
|
|
Convey("emits events to http sink", t, func() {
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
event, err := cehttp.NewEventFromHTTPRequest(r)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
|
|
eventChan <- event
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
|
|
defer server.Close()
|
|
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.HTTP,
|
|
Address: server.URL,
|
|
Timeout: 5 * time.Second,
|
|
}
|
|
sink, err := events.NewHTTPSink(config)
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
So(err, ShouldBeNil)
|
|
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestNATSSinkEvents(t *testing.T) {
|
|
Convey("emits events to nats sink", t, func() {
|
|
Convey("repository created", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.RepositoryCreated("test")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
So(err, ShouldBeNil)
|
|
defer recorder.Close()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func setupTestNATSServer(t *testing.T) (*server.Server, string) {
|
|
t.Helper()
|
|
|
|
opts := server.Options{
|
|
Host: "127.0.0.1",
|
|
Port: -1, // Use random available port
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
MaxControlLine: 4096,
|
|
}
|
|
|
|
natsServer, err := server.NewServer(&opts)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go natsServer.Start()
|
|
|
|
if !natsServer.ReadyForConnections(5 * time.Second) {
|
|
panic("NATS server failed to start")
|
|
}
|
|
|
|
return natsServer, natsServer.ClientURL()
|
|
}
|
|
|
|
func createRecorder(t *testing.T, natsURL, testChannel string) (events.Recorder, error) {
|
|
t.Helper()
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.NATS,
|
|
Address: natsURL,
|
|
Channel: testChannel,
|
|
Timeout: 15 * time.Second,
|
|
}
|
|
|
|
sink, err := events.NewNATSSink(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return recorder, nil
|
|
}
|
|
|
|
func createSubscription(t *testing.T, natsURL, channelName string, bus chan *cloudevents.Event) (*nats.Conn, error) {
|
|
t.Helper()
|
|
|
|
natsConnection, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = natsConnection.Subscribe(channelName, func(msg *nats.Msg) {
|
|
event := cloudevents.NewEvent()
|
|
|
|
headers := msg.Header
|
|
event.SetID(headers.Get("ce-id"))
|
|
event.SetSource(headers.Get("ce-source"))
|
|
event.SetType(headers.Get("ce-type"))
|
|
|
|
if subj := headers.Get("ce-subject"); subj != "" {
|
|
event.SetSubject(subj)
|
|
}
|
|
|
|
if err := event.UnmarshalJSON(msg.Data); err == nil {
|
|
bus <- &event
|
|
}
|
|
|
|
_ = msg.Respond([]byte("OK"))
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = natsConnection.FlushTimeout(2 * time.Second)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("flush failed: %w", err)
|
|
}
|
|
|
|
return natsConnection, nil
|
|
}
|
|
|
|
func getEvent(t *testing.T, c chan *cloudevents.Event) *cloudevents.Event {
|
|
t.Helper()
|
|
|
|
var evt *cloudevents.Event
|
|
select {
|
|
case evt = <-c:
|
|
case <-time.After(time.Second * 2):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
return evt
|
|
}
|
|
|
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
|
|
|
func randomString() string {
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
buf := make([]byte, 5)
|
|
|
|
for i := range buf {
|
|
buf[i] = charset[rand.Intn(len(charset))]
|
|
}
|
|
|
|
return string(buf)
|
|
}
|