diff --git a/pkg/extensions/config/events/config.go b/pkg/extensions/config/events/config.go index a8c7e495..ad292ff4 100644 --- a/pkg/extensions/config/events/config.go +++ b/pkg/extensions/config/events/config.go @@ -40,12 +40,14 @@ type SinkConfig struct { Channel string Timeout time.Duration Proxy *string + Headers map[string]string } type Credentials struct { Username string Password string File *string + Token string } type TLSConfig struct { diff --git a/pkg/extensions/events/http_sink.go b/pkg/extensions/events/http_sink.go index dcfc79cb..c342ef01 100644 --- a/pkg/extensions/events/http_sink.go +++ b/pkg/extensions/events/http_sink.go @@ -41,9 +41,20 @@ func NewHTTPSink(config eventsconf.SinkConfig) (*HTTPSink, error) { cehttp.WithClient(*httpClient), } - if config.Credentials != nil && config.Credentials.Username != "" { - opts = append(opts, cehttp.WithHeader("Authorization", - "Basic "+BasicAuth(config.Credentials.Username, config.Credentials.Password))) + if config.Credentials != nil { + if config.Credentials.Username != "" { + opts = append(opts, cehttp.WithHeader("Authorization", + "Basic "+BasicAuth(config.Credentials.Username, config.Credentials.Password))) + } else if config.Credentials.Token != "" { + opts = append(opts, cehttp.WithHeader("Authorization", + "Bearer "+config.Credentials.Token)) + } + } + + if config.Headers != nil { + for key, value := range config.Headers { + opts = append(opts, cehttp.WithHeader(key, value)) + } } // Create CloudEvents HTTP protocol diff --git a/pkg/extensions/events/http_sink_test.go b/pkg/extensions/events/http_sink_test.go index 9c49713e..a06635a4 100644 --- a/pkg/extensions/events/http_sink_test.go +++ b/pkg/extensions/events/http_sink_test.go @@ -63,6 +63,34 @@ func TestHTTPSink(t *testing.T) { So(sink, ShouldNotBeNil) }) + Convey("NewHTTPSink handles token auth config", t, func() { + cfg := eventsconf.SinkConfig{ + Type: eventsconf.HTTP, + Address: "http://localhost", + Credentials: &eventsconf.Credentials{ + Token: "thisisamocktoken", + }, + } + + sink, err := events.NewHTTPSink(cfg) + So(err, ShouldBeNil) + So(sink, ShouldNotBeNil) + }) + + Convey("NewHTTPSink handles custom headers config", t, func() { + cfg := eventsconf.SinkConfig{ + Type: eventsconf.HTTP, + Address: "http://localhost", + Headers: map[string]string{ + "X-Tenant-ID": "tenant-abc123", + }, + } + + sink, err := events.NewHTTPSink(cfg) + So(err, ShouldBeNil) + So(sink, ShouldNotBeNil) + }) + Convey("GetHTTPClientForConfig returns error for invalid proxy", t, func() { badProxy := "://bad-url" cfg := eventsconf.SinkConfig{