mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 05:28:07 +08:00
da426850e7
* chore: Update golangci-lint Signed-off-by: Lars Francke <git@lars-francke.de> * chore: fix all golangci-lint issues - Remove deprecated `// +build` tags - Fix godoclint, modernize, wsl_v5, govet, lll, gci, noctx issues - Update linter configuration - Modernize code to use Go 1.22+ features (for range N, slices.Contains, etc.) - Update make check lint the privileged tests Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> --------- Signed-off-by: Lars Francke <git@lars-francke.de> Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> Co-authored-by: Lars Francke <git@lars-francke.de>
341 lines
9.2 KiB
Go
341 lines
9.2 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
goerrors "errors"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/go-redsync/redsync/v4"
|
|
gors "github.com/go-redsync/redsync/v4/redis/goredis/v9"
|
|
godigest "github.com/opencontainers/go-digest"
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
zerr "zotregistry.dev/zot/v2/errors"
|
|
zlog "zotregistry.dev/zot/v2/pkg/log"
|
|
"zotregistry.dev/zot/v2/pkg/storage/constants"
|
|
)
|
|
|
|
type RedisDriver struct {
|
|
rootDir string
|
|
db redis.UniversalClient
|
|
log zlog.Logger
|
|
keyPrefix string // prepended to all keys, logically separating cache drivers accessing the same DB
|
|
useRelPaths bool // whether or not to use relative paths, should be true for filesystem and false for s3
|
|
rs *redsync.Redsync // used for locks, at the moment we are locking only for calls writing to the DB
|
|
}
|
|
|
|
type RedisDriverParameters struct {
|
|
Client redis.UniversalClient
|
|
RootDir string
|
|
UseRelPaths bool
|
|
KeyPrefix string
|
|
}
|
|
|
|
func NewRedisCache(parameters any, log zlog.Logger) (*RedisDriver, error) {
|
|
properParameters, ok := parameters.(RedisDriverParameters)
|
|
if !ok {
|
|
log.Error().Err(zerr.ErrTypeAssertionFailed).Msgf("failed to cast type, expected type '%T' but got '%T'",
|
|
RedisDriverParameters{}, parameters)
|
|
|
|
return nil, zerr.ErrTypeAssertionFailed
|
|
}
|
|
|
|
keyPrefix := properParameters.KeyPrefix
|
|
if len(keyPrefix) == 0 {
|
|
keyPrefix = "zot"
|
|
}
|
|
|
|
cacheDB := properParameters.Client
|
|
|
|
if _, err := cacheDB.Ping(context.Background()).Result(); err != nil {
|
|
log.Error().Err(err).Msg("failed to ping redis cache")
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Create an instance of redisync to be used to obtain locks
|
|
pool := gors.NewPool(cacheDB)
|
|
|
|
// note for integration with local storage we need relative paths
|
|
// while for integration with s3 storage we need absolute paths
|
|
driver := &RedisDriver{
|
|
db: cacheDB,
|
|
log: log,
|
|
rootDir: properParameters.RootDir,
|
|
useRelPaths: properParameters.UseRelPaths,
|
|
keyPrefix: keyPrefix,
|
|
rs: redsync.New(pool),
|
|
}
|
|
|
|
return driver, nil
|
|
}
|
|
|
|
func (d *RedisDriver) join(xs ...string) string {
|
|
return d.keyPrefix + ":" + strings.Join(xs, ":")
|
|
}
|
|
|
|
func (d *RedisDriver) UsesRelativePaths() bool {
|
|
return d.useRelPaths
|
|
}
|
|
|
|
func (d *RedisDriver) Name() string {
|
|
return "redis"
|
|
}
|
|
|
|
// SetClient is supposed to be used only for testing purposes.
|
|
func (d *RedisDriver) SetClient(client redis.UniversalClient) {
|
|
d.db = client
|
|
}
|
|
|
|
func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
|
|
ctx := context.TODO()
|
|
|
|
if path == "" {
|
|
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("failed to provide non-empty path")
|
|
|
|
return zerr.ErrEmptyValue
|
|
}
|
|
|
|
// use only relative (to rootDir) paths on blobs
|
|
var err error
|
|
if d.useRelPaths {
|
|
path, err = filepath.Rel(d.rootDir, path)
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
|
|
}
|
|
}
|
|
|
|
if len(path) == 0 {
|
|
return zerr.ErrEmptyValue
|
|
}
|
|
|
|
lock := d.rs.NewMutex(d.join(constants.RedisLocksBucket, digest.String()))
|
|
err = lock.Lock()
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to acquire redis lock")
|
|
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if _, err := lock.Unlock(); err != nil {
|
|
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to release redis lock")
|
|
}
|
|
}()
|
|
|
|
// see if the blob digest exists.
|
|
exists, err := d.db.HExists(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := d.db.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
|
|
if !exists {
|
|
// add the key value pair [digest, path] to blobs:origin if not
|
|
// exist already. the path becomes the canonical blob we do this in
|
|
// a transaction to make sure that if something is in the set, then
|
|
// it is guaranteed to always have a path
|
|
// note that there is a race, but the worst case is that a different
|
|
// origin path that is still valid is used.
|
|
if err := txrp.HSet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket),
|
|
digest.String(), path).Err(); err != nil {
|
|
d.log.Error().Err(err).Str("hset", d.join(constants.BlobsCache, constants.OriginalBucket)).
|
|
Str("value", path).Msg("unable to put record")
|
|
|
|
return err
|
|
}
|
|
}
|
|
// add path to the set of paths which the digest represents
|
|
if err := txrp.SAdd(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
|
|
digest.String()), path).Err(); err != nil {
|
|
d.log.Error().Err(err).Str("sadd", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
|
|
Str("value", path).Msg("unable to put record")
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
|
|
ctx := context.TODO()
|
|
|
|
path, err := d.db.HGet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
|
|
if err != nil {
|
|
if goerrors.Is(err, redis.Nil) {
|
|
return "", zerr.ErrCacheMiss
|
|
}
|
|
|
|
d.log.Error().Err(err).Str("hget", d.join(constants.BlobsCache, constants.OriginalBucket)).
|
|
Str("digest", digest.String()).Msg("unable to get record")
|
|
|
|
return "", err
|
|
}
|
|
|
|
return path, nil
|
|
}
|
|
|
|
func (d *RedisDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) {
|
|
blobPaths := []string{}
|
|
|
|
ctx := context.TODO()
|
|
|
|
originalPath, err := d.db.HGet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
|
|
if err != nil {
|
|
if goerrors.Is(err, redis.Nil) {
|
|
return nil, zerr.ErrCacheMiss
|
|
}
|
|
|
|
d.log.Error().Err(err).Str("hget", d.join(constants.BlobsCache, constants.OriginalBucket)).
|
|
Str("digest", digest.String()).Msg("unable to get record")
|
|
|
|
return nil, err
|
|
}
|
|
|
|
blobPaths = append(blobPaths, originalPath)
|
|
|
|
// see if we are in the set
|
|
duplicateBlobPaths, err := d.db.SMembers(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
|
|
digest.String())).Result()
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("smembers", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
|
|
Str("digest", digest.String()).Msg("unable to get record")
|
|
|
|
return nil, err
|
|
}
|
|
|
|
for _, item := range duplicateBlobPaths {
|
|
if item != originalPath {
|
|
blobPaths = append(blobPaths, item)
|
|
}
|
|
}
|
|
|
|
return blobPaths, nil
|
|
}
|
|
|
|
func (d *RedisDriver) HasBlob(digest godigest.Digest, path string) bool {
|
|
var err error
|
|
|
|
if d.useRelPaths {
|
|
path, err = filepath.Rel(d.rootDir, path)
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
|
|
}
|
|
}
|
|
|
|
if len(path) == 0 {
|
|
return false
|
|
}
|
|
|
|
ctx := context.TODO()
|
|
// see if we are in the set
|
|
exists, err := d.db.SIsMember(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
|
|
digest.String()), path).Result()
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("sismember", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
|
|
Str("digest", digest.String()).Msg("unable to get record")
|
|
|
|
return false
|
|
}
|
|
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
// see if the path entry exists. is this actually needed? i guess it doesn't really hurt (it is fast)
|
|
exists, err = d.db.HExists(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
|
|
|
|
d.log.Error().Err(err).Str("hexists", d.join(constants.BlobsCache, constants.OriginalBucket)).
|
|
Str("digest", digest.String()).Msg("unable to get record")
|
|
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
|
ctx := context.TODO()
|
|
|
|
// use only relative (to rootDir) paths on blobs
|
|
var err error
|
|
if d.useRelPaths {
|
|
path, err = filepath.Rel(d.rootDir, path)
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
|
|
}
|
|
}
|
|
|
|
lock := d.rs.NewMutex(d.join(constants.RedisLocksBucket, digest.String()))
|
|
err = lock.Lock()
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to acquire redis lock")
|
|
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if _, err := lock.Unlock(); err != nil {
|
|
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to release redis lock")
|
|
}
|
|
}()
|
|
|
|
pathSet := d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())
|
|
|
|
// delete path from the set of paths which the digest represents
|
|
_, err = d.db.SRem(ctx, pathSet, path).Result()
|
|
if err != nil {
|
|
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("failed to delete record")
|
|
|
|
return err
|
|
}
|
|
|
|
currentPath, err := d.GetBlob(digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if currentPath != path {
|
|
// nothing we need to do, return nil yay
|
|
return nil
|
|
}
|
|
|
|
// we need to set a new path
|
|
newPath, err := d.db.SRandMember(ctx, pathSet).Result()
|
|
if err != nil {
|
|
if goerrors.Is(err, redis.Nil) {
|
|
_, err := d.db.HDel(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
d.log.Error().Err(err).Str("srandmember", pathSet).Msg("failed to get new path")
|
|
|
|
return err
|
|
}
|
|
|
|
if _, err := d.db.HSet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket),
|
|
digest.String(), newPath).Result(); err != nil {
|
|
d.log.Error().Err(err).Str("hset", d.join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).
|
|
Msg("unable to put record")
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|