From 5e35dfa28f822c61f01ce1a0f58cda040362d3f1 Mon Sep 17 00:00:00 2001 From: Andreea-Lupu Date: Mon, 21 Mar 2022 20:40:37 +0200 Subject: [PATCH] make gc periodic Signed-off-by: Andreea-Lupu --- examples/config-gc-periodic.json | 24 ++++++++++ pkg/api/config/config.go | 2 + pkg/api/controller.go | 56 +++++++++++++++-------- pkg/api/controller_test.go | 76 +++++++++++++++++++++++++++++++ pkg/cli/root.go | 20 +++++++-- pkg/cli/root_test.go | 40 +++++++++++++++++ pkg/storage/s3/storage.go | 3 ++ pkg/storage/storage.go | 1 + pkg/storage/storage_fs.go | 77 +++++++++++++++++++++++++------- pkg/storage/storage_fs_test.go | 64 ++++++++++++++++++++++++++ 10 files changed, 327 insertions(+), 36 deletions(-) create mode 100644 examples/config-gc-periodic.json diff --git a/examples/config-gc-periodic.json b/examples/config-gc-periodic.json new file mode 100644 index 00000000..d96eb4e6 --- /dev/null +++ b/examples/config-gc-periodic.json @@ -0,0 +1,24 @@ +{ + "distSpecVersion":"1.0.1", + "storage": { + "rootDirectory": "/tmp/zot", + "gc": true, + "gcDelay": "1h", + "gcInterval": "24h", + "subPaths": { + "/a": { + "rootDirectory": "/tmp/zot1", + "gc": true, + "gcDelay": "1h", + "gcInterval": "24h" + } + } + }, + "http": { + "address": "127.0.0.1", + "port": "8080" + }, + "log": { + "level": "debug" + } +} diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index ab36a087..181db10d 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -25,6 +25,7 @@ type StorageConfig struct { Dedupe bool Commit bool GCDelay time.Duration + GCInterval time.Duration StorageDriver map[string]interface{} `mapstructure:",omitempty"` } @@ -99,6 +100,7 @@ type GlobalStorageConfig struct { GC bool Commit bool GCDelay time.Duration + GCInterval time.Duration RootDirectory string StorageDriver map[string]interface{} `mapstructure:",omitempty"` SubPaths map[string]StorageConfig diff --git a/pkg/api/controller.go b/pkg/api/controller.go index c602b40d..2bec765f 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -259,11 +259,6 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { } c.StoreController.DefaultStore = defaultStore - - // Enable extensions if extension config is provided - if c.Config != nil && c.Config.Extensions != nil { - ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory) - } } else { // we can't proceed without global storage c.Log.Error().Err(errors.ErrImgStoreNotFound).Msg("controller: no storage config provided") @@ -309,25 +304,13 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { subImageStore[route] = s3.NewImageStore(storageConfig.RootDirectory, storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics, store) } - - // Enable extensions if extension config is provided - if c.Config != nil && c.Config.Extensions != nil { - ext.EnableExtensions(c.Config, c.Log, storageConfig.RootDirectory) - } } c.StoreController.SubStore = subImageStore } } - // Enable extensions if extension config is provided - if c.Config.Extensions != nil && c.Config.Extensions.Sync != nil && *c.Config.Extensions.Sync.Enable { - ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.StoreController, c.Log) - } - - if c.Config.Extensions != nil { - ext.EnableScrubExtension(c.Config, c.StoreController, c.Log) - } + c.StartBackgroundTasks(reloadCtx) return nil } @@ -356,3 +339,40 @@ func (c *Controller) Shutdown() { ctx := context.Background() _ = c.Server.Shutdown(ctx) } + +func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { + // Enable running garbage-collect periodically for DefaultStore + if c.Config.Storage.GC && c.Config.Storage.GCInterval != 0 { + c.StoreController.DefaultStore.RunGCPeriodically(c.Config.Storage.GCInterval) + } + + // Enable extensions if extension config is provided for DefaultStore + if c.Config != nil && c.Config.Extensions != nil { + ext.EnableExtensions(c.Config, c.Log, c.Config.Storage.RootDirectory) + } + + if c.Config.Storage.SubPaths != nil { + for route, storageConfig := range c.Config.Storage.SubPaths { + // Enable running garbage-collect periodically for subImageStore + if storageConfig.GC && storageConfig.GCInterval != 0 { + c.StoreController.SubStore[route].RunGCPeriodically(storageConfig.GCInterval) + } + + // Enable extensions if extension config is provided for subImageStore + if c.Config != nil && c.Config.Extensions != nil { + ext.EnableExtensions(c.Config, c.Log, storageConfig.RootDirectory) + } + } + } + + // Enable extensions if extension config is provided for storeController + if c.Config.Extensions != nil { + if c.Config.Extensions.Sync != nil && *c.Config.Extensions.Sync.Enable { + ext.EnableSyncExtension(reloadCtx, c.Config, c.wgShutDown, c.StoreController, c.Log) + } + } + + if c.Config.Extensions != nil { + ext.EnableScrubExtension(c.Config, c.StoreController, c.Log) + } +} diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index c73e5d54..1968744e 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -4472,6 +4472,82 @@ func TestInjectTooManyOpenFiles(t *testing.T) { }) } +func TestPeriodicGC(t *testing.T) { + Convey("Periodic gc enabled for default store", t, func() { + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + + logFile, err := ioutil.TempFile("", "zot-log*.txt") + So(err, ShouldBeNil) + conf.Log.Level = "debug" + conf.Log.Output = logFile.Name() + defer os.Remove(logFile.Name()) // clean up + + ctlr := api.NewController(conf) + dir := t.TempDir() + ctlr.Config.Storage.RootDirectory = dir + ctlr.Config.Storage.GC = true + ctlr.Config.Storage.GCInterval = 1 * time.Hour + ctlr.Config.Storage.GCDelay = 1 * time.Second + + go startServer(ctlr) + defer stopServer(ctlr) + test.WaitTillServerReady(baseURL) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, + "\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":3600000000000") + So(string(data), ShouldContainSubstring, + fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.DefaultStore.RootDir())) + So(string(data), ShouldNotContainSubstring, + fmt.Sprintf("error while running GC for %s", ctlr.StoreController.DefaultStore.RootDir())) + So(string(data), ShouldContainSubstring, + fmt.Sprintf("GC completed for %s, next GC scheduled after", ctlr.StoreController.DefaultStore.RootDir())) + }) + + Convey("Periodic GC enabled for substore", t, func() { + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + + logFile, err := ioutil.TempFile("", "zot-log*.txt") + So(err, ShouldBeNil) + conf.Log.Level = "debug" + conf.Log.Output = logFile.Name() + defer os.Remove(logFile.Name()) // clean up + + ctlr := api.NewController(conf) + dir := t.TempDir() + subDir := t.TempDir() + + subPaths := make(map[string]config.StorageConfig) + + subPaths["/a"] = config.StorageConfig{RootDirectory: subDir, GC: true, GCDelay: 1 * time.Second, GCInterval: 24 * time.Hour} // nolint:lll + + ctlr.Config.Storage.SubPaths = subPaths + ctlr.Config.Storage.RootDirectory = dir + + go startServer(ctlr) + defer stopServer(ctlr) + test.WaitTillServerReady(baseURL) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + // periodic GC is not enabled for default store + So(string(data), ShouldContainSubstring, + "\"GCDelay\":3600000000000,\"GCInterval\":0,\"RootDirectory\":\""+dir+"\"") + // periodic GC is enabled for sub store + So(string(data), ShouldContainSubstring, + fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"GC\":true,\"Dedupe\":false,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) // nolint:lll + So(string(data), ShouldContainSubstring, + fmt.Sprintf("executing GC of orphaned blobs for %s", ctlr.StoreController.SubStore["/a"].RootDir())) + }) +} + func getAllBlobs(imagePath string) []string { blobList := make([]string, 0) diff --git a/pkg/cli/root.go b/pkg/cli/root.go index 7ed6d232..0567426b 100644 --- a/pkg/cli/root.go +++ b/pkg/cli/root.go @@ -209,9 +209,23 @@ func validateConfiguration(config *config.Config) error { return errors.ErrBadConfig } - if !config.Storage.GC && config.Storage.GCDelay != 0 { - log.Warn().Err(errors.ErrBadConfig). - Msg("garbage-collect delay specified without enabling garbage-collect, will be ignored") + if config.Storage.GCInterval < 0 { + log.Error().Err(errors.ErrBadConfig). + Msgf("invalid garbage-collect interval %v specified", config.Storage.GCInterval) + + return errors.ErrBadConfig + } + + if !config.Storage.GC { + if config.Storage.GCDelay != 0 { + log.Warn().Err(errors.ErrBadConfig). + Msg("garbage-collect delay specified without enabling garbage-collect, will be ignored") + } + + if config.Storage.GCInterval != 0 { + log.Warn().Err(errors.ErrBadConfig). + Msg("periodic garbage-collect interval specified without enabling garbage-collect, will be ignored") + } } // check authorization config, it should have basic auth enabled or ldap diff --git a/pkg/cli/root_test.go b/pkg/cli/root_test.go index e755b30c..0330acd5 100644 --- a/pkg/cli/root_test.go +++ b/pkg/cli/root_test.go @@ -312,6 +312,8 @@ func TestGC(t *testing.T) { err = cli.LoadConfiguration(config, "../../examples/config-gc.json") So(err, ShouldBeNil) So(config.Storage.GCDelay, ShouldNotEqual, storage.DefaultGCDelay) + err = cli.LoadConfiguration(config, "../../examples/config-gc-periodic.json") + So(err, ShouldBeNil) }) Convey("Test GC config corner cases", t, func(c C) { @@ -336,6 +338,26 @@ func TestGC(t *testing.T) { So(err, ShouldBeNil) }) + Convey("GC interval without GC", func() { + config := config.New() + err = json.Unmarshal(contents, config) + config.Storage.GC = false + config.Storage.GCDelay = 0 + config.Storage.GCInterval = 24 * time.Hour + + file, err := ioutil.TempFile("", "gc-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + contents, err = json.MarshalIndent(config, "", " ") + So(err, ShouldBeNil) + + err = ioutil.WriteFile(file.Name(), contents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(config, file.Name()) + So(err, ShouldBeNil) + }) + Convey("Negative GC delay", func() { config := config.New() err = json.Unmarshal(contents, config) @@ -371,6 +393,24 @@ func TestGC(t *testing.T) { So(err, ShouldBeNil) So(config.Storage.GCDelay, ShouldEqual, 0) }) + + Convey("Negative GC interval", func() { + config := config.New() + err = json.Unmarshal(contents, config) + config.Storage.GCInterval = -1 * time.Second + + file, err := ioutil.TempFile("", "gc-config-*.json") + So(err, ShouldBeNil) + defer os.Remove(file.Name()) + + contents, err = json.MarshalIndent(config, "", " ") + So(err, ShouldBeNil) + + err = ioutil.WriteFile(file.Name(), contents, 0o600) + So(err, ShouldBeNil) + err = cli.LoadConfiguration(config, file.Name()) + So(err, ShouldNotBeNil) + }) }) } diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index a0dc0203..3b23dd37 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -1006,6 +1006,9 @@ func (is *ObjectStorage) DedupeBlob(src string, dstDigest godigest.Digest, dst s return nil } +func (is *ObjectStorage) RunGCPeriodically(gcInterval time.Duration) { +} + // DeleteBlobUpload deletes an existing blob upload that is currently in progress. func (is *ObjectStorage) DeleteBlobUpload(repo string, uuid string) error { blobUploadPath := is.BlobUploadPath(repo, uuid) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 8bbd5b32..77a42c3a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -44,4 +44,5 @@ type ImageStore interface { GetIndexContent(repo string) ([]byte, error) GetBlobContent(repo, digest string) ([]byte, error) GetReferrers(repo, digest string, mediaType string) ([]artifactspec.Descriptor, error) + RunGCPeriodically(gcInterval time.Duration) } diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go index e67c3d8a..3ce8acb2 100644 --- a/pkg/storage/storage_fs.go +++ b/pkg/storage/storage_fs.go @@ -691,14 +691,7 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp } if is.gc { - oci, err := umoci.OpenLayout(dir) - if err := test.Error(err); err != nil { - return "", err - } - defer oci.Close() - - err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)) - if err := test.Error(err); err != nil { + if err := is.garbageCollect(dir, repo); err != nil { return "", err } } @@ -793,13 +786,7 @@ func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error } if is.gc { - oci, err := umoci.OpenLayout(dir) - if err != nil { - return err - } - defer oci.Close() - - if err := oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)); err != nil { + if err := is.garbageCollect(dir, repo); err != nil { return err } } @@ -1610,6 +1597,21 @@ func ensureDir(dir string, log zerolog.Logger) error { return nil } +func (is *ImageStoreFS) garbageCollect(dir string, repo string) error { + oci, err := umoci.OpenLayout(dir) + if err := test.Error(err); err != nil { + return err + } + defer oci.Close() + + err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)) + if err := test.Error(err); err != nil { + return err + } + + return nil +} + func ifOlderThan(imgStore *ImageStoreFS, repo string, delay time.Duration) casext.GCPolicy { return func(ctx context.Context, digest godigest.Digest) (bool, error) { blobPath := imgStore.BlobPath(repo, digest) @@ -1641,3 +1643,48 @@ func DirExists(d string) bool { return true } + +func gcAllRepos(imgStore *ImageStoreFS) error { + repos, err := imgStore.GetRepositories() + if err != nil { + return err + } + + for _, repo := range repos { + dir := path.Join(imgStore.RootDir(), repo) + + var lockLatency time.Time + + imgStore.Lock(&lockLatency) + + err := imgStore.garbageCollect(dir, repo) + + imgStore.Unlock(&lockLatency) + + if err != nil { + return err + } + } + + return nil +} + +func (is *ImageStoreFS) RunGCPeriodically(gcInterval time.Duration) { + go func() { + for { + execMessage := fmt.Sprintf("executing GC of orphaned blobs for %s", is.RootDir()) + is.log.Info().Msg(execMessage) + + err := gcAllRepos(is) + if err != nil { + errMessage := fmt.Sprintf("error while running GC for %s", is.RootDir()) + is.log.Error().Err(err).Msg(errMessage) + } + + completedMessage := fmt.Sprintf("GC completed for %s, next GC scheduled after", is.RootDir()) + is.log.Info().Str(completedMessage, gcInterval.String()).Msg("") + + time.Sleep(gcInterval) + } + }() +} diff --git a/pkg/storage/storage_fs_test.go b/pkg/storage/storage_fs_test.go index 3790714a..1635ff0e 100644 --- a/pkg/storage/storage_fs_test.go +++ b/pkg/storage/storage_fs_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" _ "crypto/sha256" "encoding/json" + "fmt" "io/ioutil" "math/big" "os" @@ -1079,6 +1080,69 @@ func TestGarbageCollect(t *testing.T) { }) } +func TestGarbageCollectForImageStore(t *testing.T) { + Convey("Garbage collect for all repos from an ImageStore", t, func(c C) { + dir := t.TempDir() + + Convey("Garbage collect error for repo with config removed", func() { + logFile, _ := ioutil.TempFile("", "zot-log*.txt") + + defer os.Remove(logFile.Name()) // clean up + + log := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(false, log) + imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics) + repoName := "gc-all-repos-short" + + err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, repoName)) + if err != nil { + panic(err) + } + + err = os.Remove(path.Join(dir, repoName, "blobs/sha256", + "2bacca16b9df395fc855c14ccf50b12b58d35d468b8e7f25758aff90f89bf396")) + if err != nil { + panic(err) + } + + imgStore.RunGCPeriodically(24 * time.Hour) + + time.Sleep(500 * time.Millisecond) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir())) + }) + + Convey("Garbage collect error - not enough permissions to access index.json", func() { + logFile, _ := ioutil.TempFile("", "zot-log*.txt") + + defer os.Remove(logFile.Name()) // clean up + + log := log.NewLogger("debug", logFile.Name()) + metrics := monitoring.NewMetricsServer(false, log) + imgStore := storage.NewImageStore(dir, true, 1*time.Second, true, true, log, metrics) + repoName := "gc-all-repos-short" + + err := test.CopyFiles("../../test/data/zot-test", path.Join(dir, repoName)) + if err != nil { + panic(err) + } + + So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o000), ShouldBeNil) + + imgStore.RunGCPeriodically(24 * time.Hour) + + time.Sleep(500 * time.Millisecond) + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, fmt.Sprintf("error while running GC for %s", imgStore.RootDir())) + So(os.Chmod(path.Join(dir, repoName, "index.json"), 0o755), ShouldBeNil) + }) + }) +} + func randSeq(n int) string { letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")