mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
feat(events): add events extension (#3045)
* feat: add events config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement event support with log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: integrate events and update tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: update event config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement http and nats sinks. remove log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: events extension setup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup tests to use nil event recorder Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update events config example and add more logging Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: better use of build tags for minimal binary Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: missing store param in evelated privileges tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: regression in config decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update check logs script to enable cross-platform usage via GREP_BIN_PATH envvar Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix log lint issue for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix failing events disabled test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add blackbox tests for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: specify architecture when downloading binaries in Makefile Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: improve failure handling when no valid sinks are provided Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix data race in events test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup event decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix logging tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: make nats server test more reliable Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: go mod cleanup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add sleep when setting up nats client Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: ensure event sink errors do not propogate Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: increase coverage for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): Refactor events to be non-blocking from caller. Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: remove harded-coded linux Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): fail to start if incorrect event sink is configured Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: allow cli tests to return errors instead of panic Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: bump nats server to v2.11.3 Signed-off-by: Piaras Hoban <phoban01@gmail.com> --------- Signed-off-by: Piaras Hoban <phoban01@gmail.com> Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com>
This commit is contained in:
@@ -119,7 +119,7 @@ func TestNegativeServerResponse(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
imageStore := local.NewImageStore(dir, false, false,
|
||||
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil, nil)
|
||||
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil, nil, nil)
|
||||
|
||||
storeController := storage.StoreController{
|
||||
DefaultStore: imageStore,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//go:build sync && scrub && metrics && search && userprefs && mgmt && imagetrust
|
||||
// +build sync,scrub,metrics,search,userprefs,mgmt,imagetrust
|
||||
//go:build sync && scrub && metrics && search && userprefs && mgmt && imagetrust && events
|
||||
// +build sync,scrub,metrics,search,userprefs,mgmt,imagetrust,events
|
||||
|
||||
package server_test
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"gopkg.in/resty.v1"
|
||||
|
||||
zerr "zotregistry.dev/zot/errors"
|
||||
"zotregistry.dev/zot/pkg/api/config"
|
||||
cli "zotregistry.dev/zot/pkg/cli/server"
|
||||
. "zotregistry.dev/zot/pkg/test/common"
|
||||
@@ -1946,3 +1947,79 @@ func TestSyncWithRemoteStorageConfig(t *testing.T) {
|
||||
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
|
||||
})
|
||||
}
|
||||
|
||||
func TestEventsExtension(t *testing.T) {
|
||||
oldArgs := os.Args
|
||||
|
||||
defer func() { os.Args = oldArgs }()
|
||||
|
||||
Convey("Events explicitly disabled", t, func(c C) {
|
||||
content := `{
|
||||
"storage": {
|
||||
"rootDirectory": "%s"
|
||||
},
|
||||
"http": {
|
||||
"address": "127.0.0.1",
|
||||
"port": "%s"
|
||||
},
|
||||
"log": {
|
||||
"level": "debug",
|
||||
"output": "%s"
|
||||
},
|
||||
"extensions": {
|
||||
"events": {
|
||||
"enable": false
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
logPath, err := runCLIWithConfig(t.TempDir(), content)
|
||||
So(err, ShouldBeNil)
|
||||
defer os.Remove(logPath) // clean up
|
||||
|
||||
found, err := ReadLogFileAndSearchString(logPath,
|
||||
"events disabled in configuration", 10*time.Second)
|
||||
|
||||
if !found {
|
||||
data, err := os.ReadFile(logPath)
|
||||
So(err, ShouldBeNil)
|
||||
t.Log(string(data))
|
||||
}
|
||||
|
||||
So(err, ShouldBeNil)
|
||||
So(found, ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("Unsupported event sink", t, func(c C) {
|
||||
content := `{
|
||||
"storage": {
|
||||
"rootDirectory": "%s"
|
||||
},
|
||||
"http": {
|
||||
"address": "127.0.0.1",
|
||||
"port": "%s"
|
||||
},
|
||||
"log": {
|
||||
"level": "debug",
|
||||
"output": "%s"
|
||||
},
|
||||
"extensions": {
|
||||
"events": {
|
||||
"enable": true,
|
||||
"sinks": [{
|
||||
"type": "unsupported"
|
||||
}]
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
logPath, err := runCLIWithConfig(t.TempDir(), content)
|
||||
defer func(p string) {
|
||||
if p != "" {
|
||||
os.Remove(p)
|
||||
}
|
||||
}(logPath) // clean up
|
||||
So(err, ShouldNotBeNil)
|
||||
So(err.Error(), ShouldContainSubstring, zerr.ErrUnsupportedEventSink.Error())
|
||||
})
|
||||
}
|
||||
|
||||
+14
-2
@@ -27,6 +27,7 @@ import (
|
||||
"zotregistry.dev/zot/pkg/api/constants"
|
||||
"zotregistry.dev/zot/pkg/common"
|
||||
extconf "zotregistry.dev/zot/pkg/extensions/config"
|
||||
eventsconf "zotregistry.dev/zot/pkg/extensions/config/events"
|
||||
"zotregistry.dev/zot/pkg/extensions/monitoring"
|
||||
zlog "zotregistry.dev/zot/pkg/log"
|
||||
storageConstants "zotregistry.dev/zot/pkg/storage/constants"
|
||||
@@ -820,8 +821,19 @@ func LoadConfiguration(config *config.Config, configPath string) error {
|
||||
}
|
||||
|
||||
metaData := &mapstructure.Metadata{}
|
||||
if err := viperInstance.UnmarshalExact(&config, metadataConfig(metaData)); err != nil {
|
||||
log.Error().Err(err).Msg("failed to unmarshaling new config")
|
||||
|
||||
decoderOpts := []viper.DecoderConfigOption{
|
||||
metadataConfig(metaData),
|
||||
viper.DecodeHook(
|
||||
mapstructure.ComposeDecodeHookFunc(
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
eventsconf.SinkConfigDecoderHook(),
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
if err := viperInstance.UnmarshalExact(&config, decoderOpts...); err != nil {
|
||||
log.Error().Err(err).Msg("failed to unmarshal new config")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2413,13 +2413,20 @@ func runCLIWithConfig(tempDir string, config string) (string, error) {
|
||||
|
||||
os.Args = []string{"cli_test", "serve", cfgfile.Name()}
|
||||
|
||||
// Run CLI in a goroutine, but handle errors via a channel
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
err = cli.NewServerRootCmd().Execute()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
errCh <- cli.NewServerRootCmd().Execute()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
case <-time.After(250 * time.Millisecond): // No startup error
|
||||
}
|
||||
|
||||
WaitTillServerReady(baseURL)
|
||||
|
||||
return logFile.Name(), nil
|
||||
|
||||
Reference in New Issue
Block a user