mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 13:37:57 +08:00
bc5fd1a357
* feat: add events config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement event support with log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: integrate events and update tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: update event config Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat: implement http and nats sinks. remove log sink Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: events extension setup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup tests to use nil event recorder Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update events config example and add more logging Signed-off-by: Piaras Hoban <phoban01@gmail.com> * refactor: better use of build tags for minimal binary Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: missing store param in evelated privileges tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: regression in config decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: update check logs script to enable cross-platform usage via GREP_BIN_PATH envvar Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix log lint issue for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: fix failing events disabled test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add blackbox tests for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: specify architecture when downloading binaries in Makefile Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: improve failure handling when no valid sinks are provided Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix data race in events test Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: cleanup event decoding Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: fix logging tests Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: make nats server test more reliable Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: go mod cleanup Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: add sleep when setting up nats client Signed-off-by: Piaras Hoban <phoban01@gmail.com> * fix: ensure event sink errors do not propogate Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: increase coverage for events Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): Refactor events to be non-blocking from caller. Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: remove harded-coded linux Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com> Signed-off-by: Piaras Hoban <phoban01@gmail.com> * feat(events): fail to start if incorrect event sink is configured Signed-off-by: Piaras Hoban <phoban01@gmail.com> * test: allow cli tests to return errors instead of panic Signed-off-by: Piaras Hoban <phoban01@gmail.com> * chore: bump nats server to v2.11.3 Signed-off-by: Piaras Hoban <phoban01@gmail.com> --------- Signed-off-by: Piaras Hoban <phoban01@gmail.com> Signed-off-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Asgeir Nilsen <asgeir.nilsen@bouvet.no> Co-authored-by: Andrei Aaron <andreifdaaron@gmail.com>
131 lines
2.9 KiB
Go
131 lines
2.9 KiB
Go
//go:build events
|
|
// +build events
|
|
|
|
package events
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
|
|
|
|
zerr "zotregistry.dev/zot/errors"
|
|
eventsconf "zotregistry.dev/zot/pkg/extensions/config/events"
|
|
)
|
|
|
|
type HTTPSink struct {
|
|
cloudevents.Client
|
|
config eventsconf.SinkConfig
|
|
}
|
|
|
|
func NewHTTPSink(config eventsconf.SinkConfig) (*HTTPSink, error) {
|
|
if config.Type != eventsconf.HTTP {
|
|
return nil, zerr.ErrInvalidEventSinkType
|
|
}
|
|
|
|
if config.Address == "" {
|
|
return nil, zerr.ErrEventSinkAddressEmpty
|
|
}
|
|
|
|
// Create the basic http client
|
|
httpClient, err := GetHTTPClientForConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts := []cehttp.Option{
|
|
cehttp.WithTarget(config.Address),
|
|
cehttp.WithClient(*httpClient),
|
|
}
|
|
|
|
if config.Credentials != nil && config.Credentials.Username != "" {
|
|
opts = append(opts, cehttp.WithHeader("Authorization",
|
|
"Basic "+BasicAuth(config.Credentials.Username, config.Credentials.Password)))
|
|
}
|
|
|
|
// Create CloudEvents HTTP protocol
|
|
provider, err := cehttp.New(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create CloudEvents client
|
|
ceClient, err := cloudevents.NewClient(provider)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &HTTPSink{
|
|
Client: ceClient,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Emit sends the event to the sink.
|
|
func (s *HTTPSink) Emit(event *cloudevents.Event) cloudevents.Result {
|
|
ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
|
|
defer cancel()
|
|
|
|
if err := event.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.config.Channel != "" {
|
|
event.SetExtension("channel", s.config.Channel)
|
|
}
|
|
|
|
// Send the event
|
|
return s.Send(ctx, *event)
|
|
}
|
|
|
|
// Close implements a method to clean up resources.
|
|
func (s *HTTPSink) Close() error {
|
|
// For HTTP clients, typically no specific cleanup is needed
|
|
// We could cancel any in-flight requests if we tracked them
|
|
return nil
|
|
}
|
|
|
|
func GetHTTPClientForConfig(config eventsconf.SinkConfig) (*http.Client, error) {
|
|
transport, ok := http.DefaultTransport.(*http.Transport)
|
|
if !ok {
|
|
return nil, zerr.ErrCouldNotCreateHTTPEventTransport
|
|
}
|
|
transport = transport.Clone()
|
|
|
|
if config.Proxy != nil && *config.Proxy != "" {
|
|
proxyURL, err := url.Parse(*config.Proxy)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
transport.Proxy = http.ProxyURL(proxyURL)
|
|
}
|
|
|
|
if config.TLSConfig != nil && (config.TLSConfig.CACertFile != "" || config.TLSConfig.CertFile != "") {
|
|
tlsConfig, err := getTLSConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
transport.TLSClientConfig = tlsConfig
|
|
}
|
|
|
|
timeout := config.Timeout
|
|
if timeout == 0 {
|
|
timeout = DefaultHTTPTimeout
|
|
}
|
|
|
|
return &http.Client{
|
|
Transport: transport,
|
|
Timeout: timeout,
|
|
}, nil
|
|
}
|
|
|
|
// Helper function for basic auth encoding.
|
|
func BasicAuth(username, password string) string {
|
|
auth := username + ":" + password
|
|
|
|
return base64.StdEncoding.EncodeToString([]byte(auth))
|
|
}
|