Implement an API for performance monitoring

Signed-off-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
Alexei Dodon
2021-10-15 18:05:00 +03:00
committed by Ramkumar Chinchani
parent 061dfb333b
commit 8e4d828867
54 changed files with 27267 additions and 196 deletions
+37
View File
@@ -0,0 +1,37 @@
// +build minimal
package api
// We export below types in order for cli package to be able to read it from configuration file.
type LogConfig struct {
Level string
Output string
}
type MetricsConfig struct {
Path string
}
type ServerConfig struct {
Protocol string
Host string
Port string
}
type ExporterConfig struct {
Port string
Log *LogConfig
Metrics *MetricsConfig
}
type Config struct {
Server ServerConfig
Exporter ExporterConfig
}
func DefaultConfig() *Config {
return &Config{
Server: ServerConfig{Protocol: "http", Host: "localhost", Port: "8080"},
Exporter: ExporterConfig{Port: "8081", Log: &LogConfig{Level: "debug"}, Metrics: &MetricsConfig{Path: "/metrics"}},
}
}
+21
View File
@@ -0,0 +1,21 @@
// +build minimal
package api
import (
"github.com/anuvu/zot/pkg/log"
)
type Controller struct {
Config *Config
Log log.Logger
}
func NewController(cfg *Config) *Controller {
logger := log.NewLogger(cfg.Exporter.Log.Level, cfg.Exporter.Log.Output)
return &Controller{Config: cfg, Log: logger}
}
func (c *Controller) Run() {
runExporter(c)
}
+483
View File
@@ -0,0 +1,483 @@
// +build minimal
package api_test
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
zotapi "github.com/anuvu/zot/pkg/api"
zotcfg "github.com/anuvu/zot/pkg/api/config"
"github.com/anuvu/zot/pkg/exporter/api"
"github.com/anuvu/zot/pkg/extensions/monitoring"
jsoniter "github.com/json-iterator/go"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
. "github.com/smartystreets/goconvey/convey"
"gopkg.in/resty.v1"
)
const (
BaseURL = "http://127.0.0.1:%s"
SleepTime = 50 * time.Millisecond
SecondToNanoseconds = 1000000000
)
func getRandomLatencyN(maxNanoSeconds int64) time.Duration {
rand.Seed(time.Now().UnixNano())
return time.Duration(rand.Int63n(maxNanoSeconds))
}
func getRandomLatency() time.Duration {
return getRandomLatencyN(120 * SecondToNanoseconds) // a random latency (in nanoseconds) that can be up to 2 minutes
}
func getFreePort() string {
port, err := freeport.GetFreePort()
if err != nil {
panic(err)
}
return fmt.Sprint(port)
}
func TestNew(t *testing.T) {
Convey("Make a new controller", t, func() {
config := api.DefaultConfig()
So(config, ShouldNotBeNil)
So(api.NewController(config), ShouldNotBeNil)
})
}
func isChannelDrained(ch chan prometheus.Metric) bool {
time.Sleep(SleepTime)
select {
case <-ch:
return false
default:
return true
}
}
func readDefaultMetrics(zc *api.Collector, ch chan prometheus.Metric) {
var metric dto.Metric
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_up"].String())
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, 1)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_info"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, 0)
}
func TestNewExporter(t *testing.T) {
Convey("Make an exporter controller", t, func() {
exporterConfig := api.DefaultConfig()
So(exporterConfig, ShouldNotBeNil)
exporterPort := getFreePort()
serverPort := getFreePort()
exporterConfig.Exporter.Port = exporterPort
dir, _ := ioutil.TempDir("", "metrics")
exporterConfig.Exporter.Metrics.Path = strings.TrimPrefix(dir, "/tmp/")
exporterConfig.Server.Port = serverPort
exporterController := api.NewController(exporterConfig)
Convey("Start the zot exporter", func() {
go func() {
// this blocks
exporterController.Run()
So(nil, ShouldNotBeNil) // Fail the test in case zot exporter unexpectedly exits
}()
time.Sleep(SleepTime)
zc := api.GetCollector(exporterController)
ch := make(chan prometheus.Metric)
Convey("When zot server not running", func() {
go func() {
// this blocks
zc.Collect(ch)
}()
// Read from the channel expected values
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_up"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, 0) // "zot_up=0" means zot server is not running
// Check that no more data was written to the channel
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("When zot server is running", func() {
servercConfig := zotcfg.New()
So(servercConfig, ShouldNotBeNil)
baseURL := fmt.Sprintf(BaseURL, serverPort)
servercConfig.HTTP.Port = serverPort
serverController := zotapi.NewController(servercConfig)
So(serverController, ShouldNotBeNil)
dir, err := ioutil.TempDir("", "exporter-test")
So(err, ShouldBeNil)
defer os.RemoveAll(dir)
serverController.Config.Storage.RootDirectory = dir
go func(c *zotapi.Controller) {
// this blocks
if err := c.Run(); !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}(serverController)
defer func(c *zotapi.Controller) {
_ = c.Server.Shutdown(context.TODO())
}(serverController)
// wait till ready
for {
_, err := resty.R().Get(baseURL)
if err == nil {
break
}
time.Sleep(SleepTime)
}
// Side effect of calling this endpoint is that it will enable metrics
resp, err := resty.R().Get(baseURL + "/v2/metrics")
So(resp, ShouldNotBeNil)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
Convey("Collecting data: default metrics", func() {
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test init value & that increment works on Counters", func() {
//Testing initial value of the counter to be 1 after first incrementation call
monitoring.IncUploadCounter(serverController.Metrics, "testrepo")
time.Sleep(SleepTime)
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_repo_uploads_total"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, 1)
So(isChannelDrained(ch), ShouldEqual, true)
//Testing that counter is incremented by 1
monitoring.IncUploadCounter(serverController.Metrics, "testrepo")
time.Sleep(SleepTime)
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_repo_uploads_total"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, 2)
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test that concurent Counter increment requests works properly", func() {
reqsSize := rand.Intn(1000)
for i := 0; i < reqsSize; i++ {
monitoring.IncDownloadCounter(serverController.Metrics, "dummyrepo")
}
time.Sleep(SleepTime)
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_repo_downloads_total"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, reqsSize)
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test init value & that observe works on Summaries", func() {
//Testing initial value of the summary counter to be 1 after first observation call
var latency1, latency2 time.Duration
latency1 = getRandomLatency()
monitoring.ObserveHTTPRepoLatency(serverController.Metrics, "/v2/testrepo/blogs/dummydigest", latency1)
time.Sleep(SleepTime)
go func() {
//this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_count"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, 1)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_sum"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, latency1.Seconds())
So(isChannelDrained(ch), ShouldEqual, true)
//Testing that summary counter is incremented by 1 and summary sum is properly updated
latency2 = getRandomLatency()
monitoring.ObserveHTTPRepoLatency(serverController.Metrics, "/v2/testrepo/blogs/dummydigest", latency2)
time.Sleep(SleepTime)
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_count"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, 2)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_sum"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, (latency1.Seconds())+(latency2.Seconds()))
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test that concurent Summary observation requests works properly", func() {
var latencySum float64
reqsSize := rand.Intn(1000)
for i := 0; i < reqsSize; i++ {
latency := getRandomLatency()
latencySum += latency.Seconds()
monitoring.ObserveHTTPRepoLatency(serverController.Metrics, "/v2/dummyrepo/manifests/testreference", latency)
}
time.Sleep(SleepTime)
go func() {
// this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_count"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, reqsSize)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_repo_latency_seconds_sum"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, latencySum)
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test init value & that observe works on Histogram buckets", func() {
//Testing initial value of the histogram counter to be 1 after first observation call
latency := getRandomLatency()
monitoring.ObserveHTTPMethodLatency(serverController.Metrics, "GET", latency)
time.Sleep(SleepTime)
go func() {
//this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_count"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, 1)
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_sum"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, latency.Seconds())
for _, fvalue := range monitoring.GetDefaultBuckets() {
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_bucket"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
if latency.Seconds() < fvalue {
So(*metric.Counter.Value, ShouldEqual, 1)
} else {
So(*metric.Counter.Value, ShouldEqual, 0)
}
}
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Collecting data: Test init Histogram buckets \n", func() {
//Generate a random latency within each bucket and finally test
// that "higher" rank bucket counter is incremented by 1
var latencySum float64
dBuckets := monitoring.GetDefaultBuckets()
for i, fvalue := range dBuckets {
var latency time.Duration
if i == 0 {
//first bucket value
latency = getRandomLatencyN(int64(fvalue * SecondToNanoseconds))
} else {
pvalue := dBuckets[i-1] // previous bucket value
latency = time.Duration(pvalue*SecondToNanoseconds) +
getRandomLatencyN(int64(dBuckets[0]*SecondToNanoseconds))
}
latencySum += latency.Seconds()
monitoring.ObserveHTTPMethodLatency(serverController.Metrics, "GET", latency)
}
time.Sleep(SleepTime)
go func() {
//this blocks
zc.Collect(ch)
}()
readDefaultMetrics(zc, ch)
pm := <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_count"].String())
var metric dto.Metric
err := pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, len(dBuckets))
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_sum"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, latencySum)
for i := range dBuckets {
pm = <-ch
So(pm.Desc().String(), ShouldEqual, zc.MetricsDesc["zot_http_method_latency_seconds_bucket"].String())
err = pm.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Counter.Value, ShouldEqual, i+1)
}
So(isChannelDrained(ch), ShouldEqual, true)
})
Convey("Negative testing: Send unknown metric type to MetricServer", func() {
serverController.Metrics.SendMetric(getRandomLatency())
})
Convey("Concurrent metrics scrape", func() {
var wg sync.WaitGroup
workersSize := rand.Intn(100)
for i := 0; i < workersSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m := serverController.Metrics.ReceiveMetrics()
var json = jsoniter.ConfigCompatibleWithStandardLibrary
_, err := json.Marshal(m)
if err != nil {
exporterController.Log.Error().Err(err).Msg("Concurrent metrics scrape fail")
}
}()
}
wg.Wait()
})
Convey("Negative testing: Increment a counter that does not exist", func() {
cv := monitoring.CounterValue{Name: "dummyName"}
serverController.Metrics.SendMetric(cv)
})
Convey("Negative testing: Set a gauge for a metric with len(labelNames)!=len(knownLabelNames)", func() {
gv := monitoring.GaugeValue{
Name: "zot.info",
Value: 1,
LabelNames: []string{"commit", "binaryType", "version"},
}
serverController.Metrics.SendMetric(gv)
})
Convey("Negative testing: Summary observe for a metric with labelNames!=knownLabelNames", func() {
sv := monitoring.SummaryValue{
Name: "zot.repo.latency.seconds",
LabelNames: []string{"dummyRepoLabelName"},
LabelValues: []string{"dummyrepo"},
}
serverController.Metrics.SendMetric(sv)
})
Convey("Negative testing: Histogram observe for a metric with len(labelNames)!=len(LabelValues)", func() {
hv := monitoring.HistogramValue{
Name: "zot.method.latency.seconds",
LabelNames: []string{"method"},
LabelValues: []string{"GET", "POST", "DELETE"},
}
serverController.Metrics.SendMetric(hv)
})
Convey("Negative testing: error in getting the size for a repo directory", func() {
monitoring.SetStorageUsage(serverController.Metrics, "/tmp/zot", "dummyrepo")
})
Convey("Disabling metrics after idle timeout", func() {
So(serverController.Metrics.IsEnabled(), ShouldEqual, true)
time.Sleep(monitoring.GetMaxIdleScrapeInterval())
So(serverController.Metrics.IsEnabled(), ShouldEqual, false)
})
})
})
})
}
+180
View File
@@ -0,0 +1,180 @@
// +build minimal
package api
import (
"fmt"
"math"
"net/http"
"regexp"
"strconv"
"github.com/anuvu/zot/pkg/extensions/monitoring"
"github.com/anuvu/zot/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type Collector struct {
Client *monitoring.MetricsClient
MetricsDesc map[string]*prometheus.Desc // all known metrics descriptions
invalidChars *regexp.Regexp
}
// Implements prometheus.Collector interface.
func (zc Collector) Describe(ch chan<- *prometheus.Desc) {
for _, metricDescription := range zc.MetricsDesc {
ch <- metricDescription
}
}
// Implements prometheus.Collector interface.
func (zc Collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := zc.Client.GetMetrics()
if err != nil {
fmt.Println(err)
ch <- prometheus.MustNewConstMetric(zc.MetricsDesc["zot_up"], prometheus.GaugeValue, 0)
return
}
ch <- prometheus.MustNewConstMetric(zc.MetricsDesc["zot_up"], prometheus.GaugeValue, 1)
for _, g := range metrics.Gauges {
name := zc.invalidChars.ReplaceAllLiteralString(g.Name, "_")
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.GaugeValue, g.Value, g.LabelValues...)
}
for _, c := range metrics.Counters {
name := zc.invalidChars.ReplaceAllLiteralString(c.Name, "_")
name += "_total"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, float64(c.Count), c.LabelValues...)
}
for _, s := range metrics.Summaries {
mname := zc.invalidChars.ReplaceAllLiteralString(s.Name, "_")
name := mname + "_count"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, float64(s.Count), s.LabelValues...)
name = mname + "_sum"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, s.Sum, s.LabelValues...)
}
for _, h := range metrics.Histograms {
mname := zc.invalidChars.ReplaceAllLiteralString(h.Name, "_")
name := mname + "_count"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, float64(h.Count), h.LabelValues...)
name = mname + "_sum"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, h.Sum, h.LabelValues...)
if h.Buckets != nil {
for _, fvalue := range monitoring.GetDefaultBuckets() {
var svalue string
if fvalue == math.MaxFloat64 {
svalue = "+Inf"
} else {
svalue = strconv.FormatFloat(fvalue, 'f', -1, 64)
}
name = mname + "_bucket"
ch <- prometheus.MustNewConstMetric(
zc.MetricsDesc[name], prometheus.CounterValue, float64(h.Buckets[svalue]), append(h.LabelValues, svalue)...)
}
}
}
}
func panicOnDuplicateMetricName(m map[string]*prometheus.Desc, name string, log log.Logger) {
if _, present := m[name]; present {
log.Fatal().Msg("Duplicate keys: metric " + name + " already present")
}
}
func GetCollector(c *Controller) *Collector {
//compute all metrics description map
MetricsDesc := map[string]*prometheus.Desc{
"zot_up": prometheus.NewDesc(
"zot_up",
"Connection to zot server was successfully established.",
nil, nil,
),
}
invalidChars := regexp.MustCompile("[^a-zA-Z0-9:_]")
for metricName, metricLabelNames := range monitoring.GetCounters() {
name := invalidChars.ReplaceAllLiteralString(metricName, "_")
name += "_total"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
}
for metricName, metricLabelNames := range monitoring.GetGauges() {
name := invalidChars.ReplaceAllLiteralString(metricName, "_")
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
}
for metricName, metricLabelNames := range monitoring.GetSummaries() {
mname := invalidChars.ReplaceAllLiteralString(metricName, "_")
name := mname + "_count"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
name = mname + "_sum"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
}
for metricName, metricLabelNames := range monitoring.GetHistograms() {
mname := invalidChars.ReplaceAllLiteralString(metricName, "_")
name := mname + "_count"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
name = mname + "_sum"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, metricLabelNames, nil)
name = mname + "_bucket"
panicOnDuplicateMetricName(MetricsDesc, name, c.Log)
// Append a new label to hitogram bucket - le - 'lower or equal'
MetricsDesc[name] = prometheus.NewDesc(name, "Metric "+name, append(metricLabelNames, "le"), nil)
}
// parameters to connect to the zot server
serverAddr := fmt.Sprintf("%s://%s:%s", c.Config.Server.Protocol,
c.Config.Server.Host, c.Config.Server.Port)
cfg := &monitoring.MetricsConfig{Address: serverAddr}
return &Collector{
Client: monitoring.NewMetricsClient(cfg, c.Log),
MetricsDesc: MetricsDesc,
invalidChars: invalidChars,
}
}
func runExporter(c *Controller) {
err := prometheus.Register(GetCollector(c))
if err != nil {
c.Log.Error().Err(err).Msg("Expected error in testing")
}
http.Handle(c.Config.Exporter.Metrics.Path, promhttp.Handler())
exporterAddr := fmt.Sprintf(":%s", c.Config.Exporter.Port)
c.Log.Info().Msgf("Exporter is listening on %s & exposes metrics on %s path",
exporterAddr, c.Config.Exporter.Metrics.Path)
serverAddr := fmt.Sprintf("%s://%s:%s", c.Config.Server.Protocol,
c.Config.Server.Host, c.Config.Server.Port)
c.Log.Info().Msgf("Scraping metrics from %s", serverAddr)
c.Log.Fatal().Err(http.ListenAndServe(exporterAddr, nil)).Msg("Exporter stopped")
}