mirror of
https://github.com/project-zot/zot.git
synced 2026-06-20 06:37:56 +08:00
2402296e9a
* fix: migrate to Go module v2 for proper semantic versioning This change updates the module path from 'zotregistry.dev/zot' to 'zotregistry.dev/zot/v2' to comply with Go's semantic versioning rules. According to Go's module versioning requirements, major version v2+ must include the major version in the module path. The current module path 'zotregistry.dev/zot' only supports v0.x.x and v1.x.x versions, making existing v2.x.x tags (like v2.1.8) unusable. Changes: - Updated go.mod module path to zotregistry.dev/zot/v2 - Updated all internal import paths across 280+ Go source files - Updated configuration files (golangcilint.yaml, gqlgen.yml) - Updated README.md Go reference badge This fix enables proper use of existing v2.x.x Git tags and allows external packages to import zot v2+ versions without compatibility errors. Resolves: Go module import compatibility for v2+ versions Fixes: #3071 Signed-off-by: Luca Muscariello <muscariello@ieee.org> * fix: regenerate GraphQL files with updated v2 import paths The gqlgen tool needs to regenerate the GraphQL schema files after the module path change to use the new v2 imports. Signed-off-by: Luca Muscariello <muscariello@ieee.org> --------- Signed-off-by: Luca Muscariello <muscariello@ieee.org>
351 lines
8.7 KiB
Go
351 lines
8.7 KiB
Go
//go:build events
|
|
// +build events
|
|
|
|
package events_test
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
|
|
"github.com/google/go-containerregistry/pkg/v1/types"
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"github.com/nats-io/nats.go"
|
|
. "github.com/smartystreets/goconvey/convey"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
|
|
zerr "zotregistry.dev/zot/v2/errors"
|
|
eventsconf "zotregistry.dev/zot/v2/pkg/extensions/config/events"
|
|
"zotregistry.dev/zot/v2/pkg/extensions/events"
|
|
"zotregistry.dev/zot/v2/pkg/log"
|
|
)
|
|
|
|
type mockSink struct {
|
|
store chan *cloudevents.Event
|
|
}
|
|
|
|
func (s *mockSink) Emit(e *cloudevents.Event) cloudevents.Result {
|
|
s.store <- e
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *mockSink) Close() error {
|
|
return nil
|
|
}
|
|
|
|
var _ events.Sink = (*mockSink)(nil)
|
|
|
|
func newMockSink() *mockSink {
|
|
return &mockSink{
|
|
store: make(chan *cloudevents.Event),
|
|
}
|
|
}
|
|
|
|
func TestEventSinkMissing(t *testing.T) {
|
|
Convey("missing sink", t, func() {
|
|
_, err := events.NewRecorder(log.NewTestLogger())
|
|
So(err, ShouldNotBeNil)
|
|
So(err, ShouldEqual, zerr.ErrEventSinkIsNil)
|
|
})
|
|
}
|
|
|
|
func TestEvents(t *testing.T) {
|
|
Convey("emits events", t, func() {
|
|
sink := newMockSink()
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
So(err, ShouldBeNil)
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
ev := <-sink.store
|
|
So(ev.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestHTTPSinkEvents(t *testing.T) {
|
|
Convey("emits events to http sink", t, func() {
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
event, err := cehttp.NewEventFromHTTPRequest(r)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
|
|
eventChan <- event
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
|
|
defer server.Close()
|
|
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.HTTP,
|
|
Address: server.URL,
|
|
Timeout: 5 * time.Second,
|
|
}
|
|
sink, err := events.NewHTTPSink(config)
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
So(err, ShouldBeNil)
|
|
|
|
Convey("repository created", func() {
|
|
recorder.RepositoryCreated("test")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestNATSSinkEvents(t *testing.T) {
|
|
Convey("emits events to nats sink", t, func() {
|
|
Convey("repository created", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.RepositoryCreated("test")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.RepositoryCreatedEventType.String())
|
|
})
|
|
|
|
Convey("image updated", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
So(err, ShouldBeNil)
|
|
defer recorder.Close()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageUpdated("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageUpdatedEventType.String())
|
|
})
|
|
|
|
Convey("image deleted", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageDeleted("test", "v1", "", string(types.OCIManifestSchema1))
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageDeletedEventType.String())
|
|
})
|
|
|
|
Convey("image lint failed", func() {
|
|
natsServer, natsURL := setupTestNATSServer(t)
|
|
defer natsServer.Shutdown()
|
|
|
|
testChannel := "test-events-" + randomString()
|
|
|
|
recorder, err := createRecorder(t, natsURL, testChannel)
|
|
defer recorder.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
eventChan := make(chan *cloudevents.Event, 1)
|
|
|
|
nc, err := createSubscription(t, natsURL, testChannel, eventChan)
|
|
defer nc.Close()
|
|
So(err, ShouldBeNil)
|
|
|
|
recorder.ImageLintFailed("test", "v1", "", string(types.OCIManifestSchema1), "")
|
|
|
|
e := getEvent(t, eventChan)
|
|
So(e, ShouldNotBeNil)
|
|
So(e.Type(), ShouldEqual, events.ImageLintFailedEventType.String())
|
|
})
|
|
})
|
|
}
|
|
|
|
func setupTestNATSServer(t *testing.T) (*server.Server, string) {
|
|
t.Helper()
|
|
|
|
opts := server.Options{
|
|
Host: "127.0.0.1",
|
|
Port: -1, // Use random available port
|
|
NoLog: true,
|
|
NoSigs: true,
|
|
MaxControlLine: 4096,
|
|
}
|
|
|
|
natsServer, err := server.NewServer(&opts)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
go natsServer.Start()
|
|
|
|
if !natsServer.ReadyForConnections(5 * time.Second) {
|
|
panic("NATS server failed to start")
|
|
}
|
|
|
|
return natsServer, natsServer.ClientURL()
|
|
}
|
|
|
|
func createRecorder(t *testing.T, natsURL, testChannel string) (events.Recorder, error) {
|
|
t.Helper()
|
|
config := eventsconf.SinkConfig{
|
|
Type: eventsconf.NATS,
|
|
Address: natsURL,
|
|
Channel: testChannel,
|
|
Timeout: 15 * time.Second,
|
|
}
|
|
|
|
sink, err := events.NewNATSSink(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
recorder, err := events.NewRecorder(log.NewTestLogger(), sink)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return recorder, nil
|
|
}
|
|
|
|
func createSubscription(t *testing.T, natsURL, channelName string, bus chan *cloudevents.Event) (*nats.Conn, error) {
|
|
t.Helper()
|
|
|
|
natsConnection, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = natsConnection.Subscribe(channelName, func(msg *nats.Msg) {
|
|
event := cloudevents.NewEvent()
|
|
|
|
headers := msg.Header
|
|
event.SetID(headers.Get("ce-id"))
|
|
event.SetSource(headers.Get("ce-source"))
|
|
event.SetType(headers.Get("ce-type"))
|
|
|
|
if subj := headers.Get("ce-subject"); subj != "" {
|
|
event.SetSubject(subj)
|
|
}
|
|
|
|
if err := event.UnmarshalJSON(msg.Data); err == nil {
|
|
bus <- &event
|
|
}
|
|
|
|
_ = msg.Respond([]byte("OK"))
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = natsConnection.FlushTimeout(2 * time.Second)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("flush failed: %w", err)
|
|
}
|
|
|
|
return natsConnection, nil
|
|
}
|
|
|
|
func getEvent(t *testing.T, c chan *cloudevents.Event) *cloudevents.Event {
|
|
t.Helper()
|
|
|
|
var evt *cloudevents.Event
|
|
select {
|
|
case evt = <-c:
|
|
case <-time.After(time.Second * 2):
|
|
t.Fatal("timed out waiting for event")
|
|
}
|
|
|
|
return evt
|
|
}
|
|
|
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
|
|
|
func randomString() string {
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
buf := make([]byte, 5)
|
|
|
|
for i := range buf {
|
|
buf[i] = charset[rand.Intn(len(charset))]
|
|
}
|
|
|
|
return string(buf)
|
|
}
|