From c4d34b72695cde02da71812b78184b1b952bcca1 Mon Sep 17 00:00:00 2001 From: Alexei Dodon Date: Tue, 21 Dec 2021 15:19:40 +0200 Subject: [PATCH] Added storage latency histogram metric Signed-off-by: Alexei Dodon --- examples/metrics/Makefile | 2 +- pkg/exporter/api/controller_test.go | 43 +++++++++++ pkg/exporter/api/exporter.go | 2 +- pkg/extensions/monitoring/common.go | 5 -- pkg/extensions/monitoring/extension.go | 23 ++++++ pkg/extensions/monitoring/minimal.go | 39 ++++++++-- pkg/storage/s3/storage.go | 91 +++++++++++++++++------- pkg/storage/scrub.go | 8 ++- pkg/storage/storage.go | 9 +-- pkg/storage/storage_fs.go | 98 ++++++++++++++++++-------- pkg/storage/storage_test.go | 11 +-- 11 files changed, 251 insertions(+), 80 deletions(-) diff --git a/examples/metrics/Makefile b/examples/metrics/Makefile index 2de51f51..77eb396b 100644 --- a/examples/metrics/Makefile +++ b/examples/metrics/Makefile @@ -1,4 +1,4 @@ -CONTAINER_RUNTIME := $(shell command -v podman 2> /dev/null || echo docker) +CONTAINER_RUNTIME := docker .PHONY: binary-container binary-container: diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index eb178d75..cc76a6a2 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -370,6 +370,49 @@ func TestNewExporter(t *testing.T) { So(isChannelDrained(chMetric), ShouldEqual, true) }) + Convey("Collecting data: Test init value & that observe works on Histogram buckets (lock latency)", func() { + // Testing initial value of the histogram counter to be 1 after first observation call + latency := getRandomLatency() + monitoring.ObserveStorageLockLatency(serverController.Metrics, latency, "/tmp/zot", "RWLock") + time.Sleep(SleepTime) + + go func() { + // this blocks + collector.Collect(chMetric) + }() + readDefaultMetrics(collector, chMetric) + + pmMetric := <-chMetric + So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_storage_lock_latency_seconds_count"].String()) + + var metric dto.Metric + err := pmMetric.Write(&metric) + So(err, ShouldBeNil) + So(*metric.Counter.Value, ShouldEqual, 1) + + pmMetric = <-chMetric + So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_storage_lock_latency_seconds_sum"].String()) + + err = pmMetric.Write(&metric) + So(err, ShouldBeNil) + So(*metric.Counter.Value, ShouldEqual, latency.Seconds()) + + for _, fvalue := range monitoring.GetBuckets("zot.storage.lock.latency.seconds") { + pmMetric = <-chMetric + So(pmMetric.Desc().String(), ShouldEqual, + collector.MetricsDesc["zot_storage_lock_latency_seconds_bucket"].String()) + + err = pmMetric.Write(&metric) + So(err, ShouldBeNil) + if latency.Seconds() < fvalue { + So(*metric.Counter.Value, ShouldEqual, 1) + } else { + So(*metric.Counter.Value, ShouldEqual, 0) + } + } + + So(isChannelDrained(chMetric), ShouldEqual, true) + }) Convey("Collecting data: Test init Histogram buckets \n", func() { // Generate a random latency within each bucket and finally test // that "higher" rank bucket counter is incremented by 1 diff --git a/pkg/exporter/api/exporter.go b/pkg/exporter/api/exporter.go index 0639cc3a..d92d4965 100644 --- a/pkg/exporter/api/exporter.go +++ b/pkg/exporter/api/exporter.go @@ -76,7 +76,7 @@ func (zc Collector) Collect(ch chan<- prometheus.Metric) { zc.MetricsDesc[name], prometheus.CounterValue, h.Sum, h.LabelValues...) if h.Buckets != nil { - for _, fvalue := range monitoring.GetDefaultBuckets() { + for _, fvalue := range monitoring.GetBuckets(h.Name) { var svalue string if fvalue == math.MaxFloat64 { svalue = "+Inf" diff --git a/pkg/extensions/monitoring/common.go b/pkg/extensions/monitoring/common.go index b4983e96..a31fca56 100644 --- a/pkg/extensions/monitoring/common.go +++ b/pkg/extensions/monitoring/common.go @@ -2,7 +2,6 @@ package monitoring import ( "fmt" - "math" "os" "path/filepath" ) @@ -15,10 +14,6 @@ type MetricServer interface { IsEnabled() bool } -func GetDefaultBuckets() []float64 { - return []float64{.05, .5, 1, 5, 30, 60, 600, math.MaxFloat64} -} - func getDirSize(path string) (int64, error) { var size int64 diff --git a/pkg/extensions/monitoring/extension.go b/pkg/extensions/monitoring/extension.go index 6df7c34a..4d889b2b 100644 --- a/pkg/extensions/monitoring/extension.go +++ b/pkg/extensions/monitoring/extension.go @@ -74,6 +74,15 @@ var ( }, []string{"commit", "binaryType", "goVersion", "version"}, ) + storageLockLatency = promauto.NewHistogramVec( // nolint: gochecknoglobals + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Name: "storage_lock_latency_seconds", + Help: "Latency of serving HTTP requests", + Buckets: GetStorageLatencyBuckets(), + }, + []string{"storageName", "lockType"}, + ) ) type metricServer struct { @@ -81,6 +90,14 @@ type metricServer struct { log log.Logger } +func GetDefaultBuckets() []float64 { + return []float64{.05, .5, 1, 5, 30, 60, 600} +} + +func GetStorageLatencyBuckets() []float64 { + return []float64{.001, .01, 0.1, 1, 5, 10, 15, 30, 60} +} + func NewMetricsServer(enabled bool, log log.Logger) MetricServer { return &metricServer{ enabled: enabled, @@ -174,3 +191,9 @@ func SetServerInfo(ms MetricServer, lvalues ...string) { serverInfo.WithLabelValues(lvalues...).Set(0) }) } + +func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageName, lockType string) { + ms.SendMetric(func() { + storageLockLatency.WithLabelValues(storageName, lockType).Observe(latency.Seconds()) + }) +} diff --git a/pkg/extensions/monitoring/minimal.go b/pkg/extensions/monitoring/minimal.go index 87dfc18c..ceab0590 100644 --- a/pkg/extensions/monitoring/minimal.go +++ b/pkg/extensions/monitoring/minimal.go @@ -27,7 +27,8 @@ const ( // Summary. httpRepoLatencySeconds = metricsNamespace + ".http.repo.latency.seconds" // Histogram. - httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds" + httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds" + storageLockLatencySeconds = metricsNamespace + ".storage.lock.latency.seconds" metricsScrapeTimeout = 2 * time.Minute metricsScrapeCheckInterval = 30 * time.Second @@ -87,6 +88,14 @@ type HistogramValue struct { LabelValues []string } +func GetDefaultBuckets() []float64 { + return []float64{.05, .5, 1, 5, 30, 60, 600, math.MaxFloat64} +} + +func GetStorageLatencyBuckets() []float64 { + return []float64{.001, .01, 0.1, 1, 5, 10, 15, 30, 60, math.MaxFloat64} +} + // implements the MetricServer interface. func (ms *metricServer) SendMetric(metric interface{}) { if ms.enabled { @@ -172,7 +181,7 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer { // convert to a map for returning easily the string corresponding to a bucket bucketsFloat2String := map[float64]string{} - for _, fvalue := range GetDefaultBuckets() { + for _, fvalue := range append(GetDefaultBuckets(), GetStorageLatencyBuckets()...) { if fvalue == math.MaxFloat64 { bucketsFloat2String[fvalue] = "+Inf" } else { @@ -219,7 +228,8 @@ func GetSummaries() map[string][]string { func GetHistograms() map[string][]string { return map[string][]string{ - httpMethodLatencySeconds: {"method"}, + httpMethodLatencySeconds: {"method"}, + storageLockLatencySeconds: {"storageName", "lockType"}, } } @@ -366,7 +376,7 @@ func (ms *metricServer) HistogramObserve(hv *HistogramValue) { // The HistogramValue not found: add it buckets := make(map[string]int) - for _, fvalue := range GetDefaultBuckets() { + for _, fvalue := range GetBuckets(hv.Name) { if hv.Sum <= fvalue { buckets[ms.bucketsF2S[fvalue]] = 1 } else { @@ -381,7 +391,7 @@ func (ms *metricServer) HistogramObserve(hv *HistogramValue) { cachedH := ms.cache.Histograms[index] cachedH.Count++ cachedH.Sum += hv.Sum - for _, fvalue := range GetDefaultBuckets() { + for _, fvalue := range GetBuckets(hv.Name) { if hv.Sum <= fvalue { cachedH.Buckets[ms.bucketsF2S[fvalue]]++ } @@ -497,6 +507,25 @@ func SetServerInfo(ms MetricServer, lvs ...string) { ms.ForceSendMetric(info) } +func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageName, lockType string) { + h := HistogramValue{ + Name: storageLockLatencySeconds, + Sum: latency.Seconds(), // convenient temporary store for Histogram latency value + LabelNames: []string{"storageName", "lockType"}, + LabelValues: []string{storageName, lockType}, + } + ms.SendMetric(h) +} + func GetMaxIdleScrapeInterval() time.Duration { return metricsScrapeTimeout + metricsScrapeCheckInterval } + +func GetBuckets(metricName string) []float64 { + switch metricName { + case storageLockLatencySeconds: + return GetStorageLatencyBuckets() + default: + return GetDefaultBuckets() + } +} diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 383d9f0e..8bac23d1 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" "sync" + "time" // Add s3 support. "github.com/docker/distribution/registry/storage/driver" @@ -29,6 +30,11 @@ import ( "zotregistry.io/zot/pkg/storage" ) +const ( + RLOCK = "RLock" + RWLOCK = "RWLock" +) + // ObjectStorage provides the image storage operations. type ObjectStorage struct { rootDir string @@ -73,23 +79,37 @@ func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger, metric } // RLock read-lock. -func (is *ObjectStorage) RLock() { +func (is *ObjectStorage) RLock(lockStart *time.Time) { + *lockStart = time.Now() + is.lock.RLock() } // RUnlock read-unlock. -func (is *ObjectStorage) RUnlock() { +func (is *ObjectStorage) RUnlock(lockStart *time.Time) { is.lock.RUnlock() + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RLOCK) // histogram } // Lock write-lock. -func (is *ObjectStorage) Lock() { +func (is *ObjectStorage) Lock(lockStart *time.Time) { + *lockStart = time.Now() + is.lock.Lock() } // Unlock write-unlock. -func (is *ObjectStorage) Unlock() { +func (is *ObjectStorage) Unlock(lockStart *time.Time) { is.lock.Unlock() + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RWLOCK) // histogram } func (is *ObjectStorage) initRepo(name string) error { @@ -143,8 +163,10 @@ func (is *ObjectStorage) initRepo(name string) error { // InitRepo creates an image repository under this store. func (is *ObjectStorage) InitRepo(name string) error { - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) return is.initRepo(name) } @@ -220,10 +242,12 @@ func (is *ObjectStorage) ValidateRepo(name string) (bool, error) { // GetRepositories returns a list of all the repositories under this store. func (is *ObjectStorage) GetRepositories() ([]string, error) { + var lockLatency time.Time + dir := is.rootDir - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) stores := make([]string, 0) err := is.store.Walk(context.Background(), dir, func(fileInfo driver.FileInfo) error { @@ -256,13 +280,15 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) { // GetImageTags returns a list of image tags available in the specified repository. func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() { return nil, zerr.ErrRepoNotFound } - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) buf, err := is.GetIndexContent(repo) if err != nil { @@ -290,13 +316,15 @@ func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) { // GetImageManifest returns the image manifest of an image in the specific repository. func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() { return nil, "", "", zerr.ErrRepoNotFound } - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) buf, err := is.GetIndexContent(repo) if err != nil { @@ -422,8 +450,10 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy refIsDigest = true } - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) dir := path.Join(is.rootDir, repo) @@ -526,6 +556,8 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy // DeleteImageManifest deletes the image manifest from the repository. func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) error { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() { return zerr.ErrRepoNotFound @@ -541,8 +573,8 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro isTag = true } - is.Lock() - defer is.Unlock() + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) buf, err := is.GetIndexContent(repo) if err != nil { @@ -921,11 +953,8 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri } uuid := u.String() - src := is.BlobUploadPath(repo, uuid) - digester := sha256.New() - buf := new(bytes.Buffer) _, err = buf.ReadFrom(body) @@ -957,8 +986,10 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri return "", -1, zerr.ErrBadBlobDigest } - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) dst := is.BlobPath(repo, dstDigest) @@ -995,6 +1026,8 @@ func (is *ObjectStorage) BlobPath(repo string, digest godigest.Digest) string { // CheckBlob verifies a blob and returns true if the blob is correct. func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, error) { + var lockLatency time.Time + dgst, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1004,8 +1037,8 @@ func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, err blobPath := is.BlobPath(repo, dgst) - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) binfo, err := is.store.Stat(context.Background(), blobPath) if err != nil { @@ -1027,6 +1060,8 @@ func (is *ObjectStorage) CheckBlob(repo string, digest string) (bool, int64, err // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ObjectStorage) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { + var lockLatency time.Time + dgst, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1036,8 +1071,8 @@ func (is *ObjectStorage) GetBlob(repo string, digest string, mediaType string) ( blobPath := is.BlobPath(repo, dgst) - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) binfo, err := is.store.Stat(context.Background(), blobPath) if err != nil { @@ -1093,6 +1128,8 @@ func (is *ObjectStorage) GetIndexContent(repo string) ([]byte, error) { // DeleteBlob removes the blob from the repository. func (is *ObjectStorage) DeleteBlob(repo string, digest string) error { + var lockLatency time.Time + dgst, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1102,8 +1139,8 @@ func (is *ObjectStorage) DeleteBlob(repo string, digest string) error { blobPath := is.BlobPath(repo, dgst) - is.Lock() - defer is.Unlock() + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) _, err = is.store.Stat(context.Background(), blobPath) if err != nil { diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 1e1849f1..aa80a031 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -9,6 +9,7 @@ import ( "os" "path" "strings" + "time" "github.com/olekukonko/tablewriter" godigest "github.com/opencontainers/go-digest" @@ -84,11 +85,12 @@ func checkImage(imageName string, imgStore ImageStore) ([]ScrubImageResult, erro if err != nil { return results, err } - defer oci.Close() - imgStore.RLock() - defer imgStore.RUnlock() + var lockLatency time.Time + + imgStore.RLock(&lockLatency) + defer imgStore.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 6c881ed5..7d483eb7 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( "io" + "time" "github.com/notaryproject/notation-go-lib" "github.com/opencontainers/go-digest" @@ -14,10 +15,10 @@ const ( type ImageStore interface { DirExists(d string) bool RootDir() string - RLock() - RUnlock() - Lock() - Unlock() + RLock(*time.Time) + RUnlock(*time.Time) + Lock(*time.Time) + Unlock(*time.Time) InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go index c30cd5b0..fa0ce04b 100644 --- a/pkg/storage/storage_fs.go +++ b/pkg/storage/storage_fs.go @@ -37,6 +37,8 @@ const ( gcDelay = 1 * time.Hour DefaultFilePerms = 0o600 DefaultDirPerms = 0o700 + RLOCK = "RLock" + RWLOCK = "RWLock" ) // BlobUpload models and upload request. @@ -142,23 +144,35 @@ func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger, metric } // RLock read-lock. -func (is *ImageStoreFS) RLock() { +func (is *ImageStoreFS) RLock(lockStart *time.Time) { + *lockStart = time.Now() + is.lock.RLock() } // RUnlock read-unlock. -func (is *ImageStoreFS) RUnlock() { +func (is *ImageStoreFS) RUnlock(lockStart *time.Time) { is.lock.RUnlock() + + lockEnd := time.Now() + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RLOCK) // histogram } // Lock write-lock. -func (is *ImageStoreFS) Lock() { +func (is *ImageStoreFS) Lock(lockStart *time.Time) { + *lockStart = time.Now() + is.lock.Lock() } // Unlock write-unlock. -func (is *ImageStoreFS) Unlock() { +func (is *ImageStoreFS) Unlock(lockStart *time.Time) { is.lock.Unlock() + + lockEnd := time.Now() + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), RWLOCK) // histogram } func (is *ImageStoreFS) initRepo(name string) error { @@ -218,8 +232,10 @@ func (is *ImageStoreFS) initRepo(name string) error { // InitRepo creates an image repository under this store. func (is *ImageStoreFS) InitRepo(name string) error { - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) return is.initRepo(name) } @@ -284,10 +300,12 @@ func (is *ImageStoreFS) ValidateRepo(name string) (bool, error) { // GetRepositories returns a list of all the repositories under this store. func (is *ImageStoreFS) GetRepositories() ([]string, error) { + var lockLatency time.Time + dir := is.rootDir - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) _, err := ioutil.ReadDir(dir) if err != nil { @@ -326,13 +344,15 @@ func (is *ImageStoreFS) GetRepositories() ([]string, error) { // GetImageTags returns a list of image tags available in the specified repository. func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, zerr.ErrRepoNotFound } - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { @@ -362,13 +382,15 @@ func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) { // GetImageManifest returns the image manifest of an image in the specific repository. func (is *ImageStoreFS) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, "", "", zerr.ErrRepoNotFound } - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { @@ -514,8 +536,10 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp refIsDigest = true } - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) dir := path.Join(is.rootDir, repo) @@ -633,6 +657,8 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp // DeleteImageManifest deletes the image manifest from the repository. func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return zerr.ErrRepoNotFound @@ -648,8 +674,8 @@ func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error isTag = true } - is.Lock() - defer is.Unlock() + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { @@ -934,8 +960,10 @@ func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reade dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) err = ensureDir(dir, is.log) if err != nil { @@ -1014,8 +1042,10 @@ func (is *ImageStoreFS) FullBlobUpload(repo string, body io.Reader, digest strin dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) - is.Lock() - defer is.Unlock() + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) _ = ensureDir(dir, is.log) dst := is.BlobPath(repo, dstDigest) @@ -1141,6 +1171,8 @@ func (is *ImageStoreFS) BlobPath(repo string, digest godigest.Digest) string { // CheckBlob verifies a blob and returns true if the blob is correct. func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, error) { + var lockLatency time.Time + parsedDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1151,11 +1183,11 @@ func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, erro blobPath := is.BlobPath(repo, parsedDigest) if is.dedupe && is.cache != nil { - is.Lock() - defer is.Unlock() + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) } else { - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) } binfo, err := os.Stat(blobPath) @@ -1233,6 +1265,8 @@ func (is *ImageStoreFS) copyBlob(repo string, blobPath string, dstRecord string) // GetBlob returns a stream to read the blob. // blob selector instead of directly downloading the blob. func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { + var lockLatency time.Time + parsedDigest, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1242,8 +1276,8 @@ func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (i blobPath := is.BlobPath(repo, parsedDigest) - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) binfo, err := os.Stat(blobPath) if err != nil { @@ -1301,6 +1335,8 @@ func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) { // DeleteBlob removes the blob from the repository. func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error { + var lockLatency time.Time + dgst, err := godigest.Parse(digest) if err != nil { is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") @@ -1310,8 +1346,8 @@ func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error { blobPath := is.BlobPath(repo, dgst) - is.Lock() - defer is.Unlock() + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) _, err = os.Stat(blobPath) if err != nil { @@ -1338,6 +1374,8 @@ func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error { } func (is *ImageStoreFS) GetReferrers(repo, digest string, mediaType string) ([]notation.Descriptor, error) { + var lockLatency time.Time + dir := path.Join(is.rootDir, repo) if !is.DirExists(dir) { return nil, zerr.ErrRepoNotFound @@ -1350,8 +1388,8 @@ func (is *ImageStoreFS) GetReferrers(repo, digest string, mediaType string) ([]n return nil, zerr.ErrBadBlobDigest } - is.RLock() - defer is.RUnlock() + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 2d6847df..c4e49fa0 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "testing" + "time" // Add s3 support. "github.com/docker/distribution/registry/storage/driver" @@ -599,16 +600,18 @@ func TestStorageAPIs(t *testing.T) { for i := 0; i < 1000; i++ { wg.Add(2) go func() { + var lockLatency time.Time defer wg.Done() - imgStore.Lock() + imgStore.Lock(&lockLatency) func() {}() - imgStore.Unlock() + imgStore.Unlock(&lockLatency) }() go func() { + var lockLatency time.Time defer wg.Done() - imgStore.RLock() + imgStore.RLock(&lockLatency) func() {}() - imgStore.RUnlock() + imgStore.RUnlock(&lockLatency) }() } wg.Wait()