mirror of
https://github.com/project-zot/zot.git
synced 2026-06-19 05:57:57 +08:00
bc5fd1a357
* feat: add events config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement event support with log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: integrate events and update tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: update event config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement http and nats sinks. remove log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: events extension setup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup tests to use nil event recorder Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update events config example and add more logging Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: better use of build tags for minimal binary Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: missing store param in evelated privileges tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: regression in config decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update check logs script to enable cross-platform usage via GREP_BIN_PATH envvar Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix log lint issue for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix failing events disabled test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add blackbox tests for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: specify architecture when downloading binaries in Makefile Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: improve failure handling when no valid sinks are provided Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix data race in events test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup event decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix logging tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: make nats server test more reliable Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: go mod cleanup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add sleep when setting up nats client Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: ensure event sink errors do not propogate Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: increase coverage for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): Refactor events to be non-blocking from caller. Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: remove harded-coded linux Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): fail to start if incorrect event sink is configured Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: allow cli tests to return errors instead of panic Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: bump nats server to v2.11.3 Signed-off-by: Piaras Hoban <phoban01@gmail.com> --------- Signed-off-by: Piaras Hoban <phoban01@gmail.com> Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com>
351 lines
8.7 KiB
Go
351 lines
8.7 KiB
Go
//go:build events
|
|
// +build events
|
|
|
|
package events_test
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
|
|
"github.com/google/go-containerregistry/pkg/v1/types"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
. "github.com/smartystreets/goconvey/convey"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
|
|
zerr "zotregistry.dev/zot/errors"
|
|
eventsconf "zotregistry.dev/zot/pkg/extensions/config/events"
|
|
"zotregistry.dev/zot/pkg/extensions/events"
|
|
"zotregistry.dev/zot/pkg/log"
|
|
)
|
|
|
|
type mockSink struct {
|
|
store chan *cloudevents.Event
|
|
}
|
|
|
|
func (s *mockSink) Emit(e *cloudevents.Event) cloudevents.Result {
|
|
s.store <- e
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *mockSink) Close() error {
|
|
return nil
|
|
}
|
|
|
|
var _ events.Sink = (*mockSink)(nil)
|
|
|
|
func newMockSink() *mockSink {
|
|
return &mockSink{
|
|
store: make(chan *cloudevents.Event),
|
|
}
|
|
}
|
|
|
|
func TestEventSinkMissing(t *testing.T) {
|
|
Convey("missing sink", t, func() {
|
|
_, err := events.NewRecorder(log.NewLogger("debug", ""))
|
|
So(err, ShouldNotBeNil)
|
|
So(err, ShouldEqual, zerr.ErrEventSinkIsNil)
|
|
})
|
|
}
|
|
|
|
func TestEvents(t *testing.T) {
|
|
Convey("emits events", t, func() {
|
|
sink := newMockSink()
|
|
recorder, err := events.NewRecorder(log.NewLogger("debug", ""), sink)
|
|
So(err, ShouldBeNil)
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestHTTPSinkEvents(t *testing.T) {
|
|
Convey("emits events to http sink", t, func() {
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
event, err := cehttp.NewEventFromHTTPRequest(r)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
|
|
eventChan <- event
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
|
|
defer server.Close()
|
|
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.HTTP,
|
|
Address: server.URL,
|
|
Timeout: 5 * time.Second,
|
|
}
|
|
sink, err := events.NewHTTPSink(config)
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := events.NewRecorder(log.NewLogger("debug", ""), sink)
|
|
So(err, ShouldBeNil)
|
|
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestNATSSinkEvents(t *testing.T) {
|
|
Convey("emits events to nats sink", t, func() {
|
|
Convey("repository created", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.RepositoryCreated("test")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
So(err, ShouldBeNil)
|
|
defer recorder.Close()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func setupTestNATSServer(t *testing.T) (*server.Server, string) {
|
|
t.Helper()
|
|
|
|
opts := server.Options{
|
|
Host: "127.0.0.1",
|
|
Port: -1, // Use random available port
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
MaxControlLine: 4096,
|
|
}
|
|
|
|
natsServer, err := server.NewServer(&opts)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go natsServer.Start()
|
|
|
|
if !natsServer.ReadyForConnections(5 * time.Second) {
|
|
panic("NATS server failed to start")
|
|
}
|
|
|
|
return natsServer, natsServer.ClientURL()
|
|
}
|
|
|
|
func createRecorder(t *testing.T, natsURL, testChannel string) (events.Recorder, error) {
|
|
t.Helper()
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.NATS,
|
|
Address: natsURL,
|
|
Channel: testChannel,
|
|
Timeout: 15 * time.Second,
|
|
}
|
|
|
|
sink, err := events.NewNATSSink(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
recorder, err := events.NewRecorder(log.NewLogger("debug", ""), sink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return recorder, nil
|
|
}
|
|
|
|
func createSubscription(t *testing.T, natsURL, channelName string, bus chan *cloudevents.Event) (*nats.Conn, error) {
|
|
t.Helper()
|
|
|
|
natsConnection, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = natsConnection.Subscribe(channelName, func(msg *nats.Msg) {
|
|
event := cloudevents.NewEvent()
|
|
|
|
headers := msg.Header
|
|
event.SetID(headers.Get("ce-id"))
|
|
event.SetSource(headers.Get("ce-source"))
|
|
event.SetType(headers.Get("ce-type"))
|
|
|
|
if subj := headers.Get("ce-subject"); subj != "" {
|
|
event.SetSubject(subj)
|
|
}
|
|
|
|
if err := event.UnmarshalJSON(msg.Data); err == nil {
|
|
bus <- &event
|
|
}
|
|
|
|
_ = msg.Respond([]byte("OK"))
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = natsConnection.FlushTimeout(2 * time.Second)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("flush failed: %w", err)
|
|
}
|
|
|
|
return natsConnection, nil
|
|
}
|
|
|
|
func getEvent(t *testing.T, c chan *cloudevents.Event) *cloudevents.Event {
|
|
t.Helper()
|
|
|
|
var evt *cloudevents.Event
|
|
select {
|
|
case evt = <-c:
|
|
case <-time.After(time.Second * 2):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
return evt
|
|
}
|
|
|
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
|
|
|
func randomString() string {
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
buf := make([]byte, 5)
|
|
|
|
for i := range buf {
|
|
buf[i] = charset[rand.Intn(len(charset))]
|
|
}
|
|
|
|
return string(buf)
|
|
}
|