redis driver for blob cache information and metadb (#2865)

* feat: add redis cache support

https://github.com/project-zot/zot/pull/2005
Fixes https://github.com/project-zot/zot/issues/2004

* feat: add redis cache support

Currently, we have dynamoDB as the remote shared cache but ideal only
for the cloud use case.
For on-prem use case, add support for redis.

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>

* feat(redis): added blackbox tests for redis

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>

* feat(redis): dummy implementation of MetaDB interface for redis cache

Signed-off-by: Alexei Dodon <adodon@cisco.com>

* feat: check validity of driver configuration on metadb instantiation

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat: multiple fixes for redis cache driver implementation

- add missing method GetAllBlobs
- add redis cache tests, with and without mocking

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): redis implementation for MetaDB

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): use redsync to block concurrent write access to the redis DB

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): update .github/workflows/cluster.yaml to also test redis

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(metadb): add keyPrefix parameter for redis and remove unneeded method meta.Crate()

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): support RedisCluster configuration and add unit tests

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): more tests for redis metadb implementation

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): add more examples and update examples/README.md

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): move option parsing and redis client initialization under pkg/api/config/redis

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* chore(cachedb): move Cache interface to pkg/storage/types

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): reorganize code in pkg/storage/cache.go

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): call redis.SetLogger() with the zot logger as parameter

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

* feat(redis): rename pkg/meta/redisdb to pkg/meta/redis

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>

---------

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
Signed-off-by: Alexei Dodon <adodon@cisco.com>
Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
Co-authored-by: a <a@tuxpa.in>
Co-authored-by: Ramkumar Chinchani <rchincha@cisco.com>
Co-authored-by: Petu Eusebiu <peusebiu@cisco.com>
Co-authored-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
Andrei Aaron
2025-01-30 21:00:52 +02:00
committed by GitHub
parent 90e1393585
commit 05823cd74f
43 changed files with 7886 additions and 442 deletions
-27
View File
@@ -1,27 +0,0 @@
package cache
import (
godigest "github.com/opencontainers/go-digest"
)
type Cache interface {
// Returns the human-readable "name" of the driver.
Name() string
// Retrieves the blob matching provided digest.
GetBlob(digest godigest.Digest) (string, error)
GetAllBlobs(digest godigest.Digest) ([]string, error)
// Uploads blob to cachedb.
PutBlob(digest godigest.Digest, path string) error
// Check if blob exists in cachedb.
HasBlob(digest godigest.Digest, path string) bool
// Delete a blob from the cachedb.
DeleteBlob(digest godigest.Digest, path string) error
// UsesRelativePaths returns if cache is storing blobs relative to cache rootDir
UsesRelativePaths() bool
}
+340
View File
@@ -0,0 +1,340 @@
package cache
import (
"context"
goerrors "errors"
"path/filepath"
"strings"
"github.com/go-redsync/redsync/v4"
gors "github.com/go-redsync/redsync/v4/redis/goredis/v9"
godigest "github.com/opencontainers/go-digest"
"github.com/redis/go-redis/v9"
zerr "zotregistry.dev/zot/errors"
zlog "zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage/constants"
)
type RedisDriver struct {
rootDir string
db redis.UniversalClient
log zlog.Logger
keyPrefix string // prepended to all keys, logically separating cache drivers accessing the same DB
useRelPaths bool // whether or not to use relative paths, should be true for filesystem and false for s3
rs *redsync.Redsync // used for locks, at the moment we are locking only for calls writing to the DB
}
type RedisDriverParameters struct {
Client redis.UniversalClient
RootDir string
UseRelPaths bool
KeyPrefix string
}
func NewRedisCache(parameters interface{}, log zlog.Logger) (*RedisDriver, error) {
properParameters, ok := parameters.(RedisDriverParameters)
if !ok {
log.Error().Err(zerr.ErrTypeAssertionFailed).Msgf("failed to cast type, expected type '%T' but got '%T'",
RedisDriverParameters{}, parameters)
return nil, zerr.ErrTypeAssertionFailed
}
keyPrefix := properParameters.KeyPrefix
if len(keyPrefix) == 0 {
keyPrefix = "zot"
}
cacheDB := properParameters.Client
if _, err := cacheDB.Ping(context.Background()).Result(); err != nil {
log.Error().Err(err).Msg("failed to ping redis cache")
return nil, err
}
// Create an instance of redisync to be used to obtain locks
pool := gors.NewPool(cacheDB)
// note for integration with local storage we need relative paths
// while for integration with s3 storage we need absolute paths
driver := &RedisDriver{
db: cacheDB,
log: log,
rootDir: properParameters.RootDir,
useRelPaths: properParameters.UseRelPaths,
keyPrefix: keyPrefix,
rs: redsync.New(pool),
}
return driver, nil
}
func (d *RedisDriver) join(xs ...string) string {
return d.keyPrefix + ":" + strings.Join(xs, ":")
}
func (d *RedisDriver) UsesRelativePaths() bool {
return d.useRelPaths
}
func (d *RedisDriver) Name() string {
return "redis"
}
// SetClient is supposed to be used only for testing purposes.
func (d *RedisDriver) SetClient(client redis.UniversalClient) {
d.db = client
}
func (d *RedisDriver) PutBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()
if path == "" {
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("failed to provide non-empty path")
return zerr.ErrEmptyValue
}
// use only relative (to rootDir) paths on blobs
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}
if len(path) == 0 {
return zerr.ErrEmptyValue
}
lock := d.rs.NewMutex(d.join(constants.RedisLocksBucket, digest.String()))
err = lock.Lock()
if err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to acquire redis lock")
return err
}
defer func() {
if _, err := lock.Unlock(); err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to release redis lock")
}
}()
// see if the blob digest exists.
exists, err := d.db.HExists(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err
}
if _, err := d.db.TxPipelined(ctx, func(txrp redis.Pipeliner) error {
if !exists {
// add the key value pair [digest, path] to blobs:origin if not
// exist already. the path becomes the canonical blob we do this in
// a transaction to make sure that if something is in the set, then
// it is guaranteed to always have a path
// note that there is a race, but the worst case is that a different
// origin path that is still valid is used.
if err := txrp.HSet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), path).Err(); err != nil {
d.log.Error().Err(err).Str("hset", d.join(constants.BlobsCache, constants.OriginalBucket)).
Str("value", path).Msg("unable to put record")
return err
}
}
// add path to the set of paths which the digest represents
if err := txrp.SAdd(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), path).Err(); err != nil {
d.log.Error().Err(err).Str("sadd", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("value", path).Msg("unable to put record")
return err
}
return nil
}); err != nil {
return err
}
return nil
}
func (d *RedisDriver) GetBlob(digest godigest.Digest) (string, error) {
ctx := context.TODO()
path, err := d.db.HGet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return "", zerr.ErrCacheMiss
}
d.log.Error().Err(err).Str("hget", d.join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")
return "", err
}
return path, nil
}
func (d *RedisDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) {
blobPaths := []string{}
ctx := context.TODO()
originalPath, err := d.db.HGet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
return nil, zerr.ErrCacheMiss
}
d.log.Error().Err(err).Str("hget", d.join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")
return nil, err
}
blobPaths = append(blobPaths, originalPath)
// see if we are in the set
duplicateBlobPaths, err := d.db.SMembers(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String())).Result()
if err != nil {
d.log.Error().Err(err).Str("smembers", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")
return nil, err
}
for _, item := range duplicateBlobPaths {
if item != originalPath {
blobPaths = append(blobPaths, item)
}
}
return blobPaths, nil
}
func (d *RedisDriver) HasBlob(digest godigest.Digest, path string) bool {
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}
if len(path) == 0 {
return false
}
ctx := context.TODO()
// see if we are in the set
exists, err := d.db.SIsMember(ctx, d.join(constants.BlobsCache, constants.DuplicatesBucket,
digest.String()), path).Result()
if err != nil {
d.log.Error().Err(err).Str("sismember", d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())).
Str("digest", digest.String()).Msg("unable to get record")
return false
}
if !exists {
return false
}
// see if the path entry exists. is this actually needed? i guess it doesn't really hurt (it is fast)
exists, err = d.db.HExists(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
d.log.Error().Err(err).Str("hexists", d.join(constants.BlobsCache, constants.OriginalBucket)).
Str("digest", digest.String()).Msg("unable to get record")
if err != nil {
return false
}
if !exists {
return false
}
return true
}
func (d *RedisDriver) DeleteBlob(digest godigest.Digest, path string) error {
ctx := context.TODO()
// use only relative (to rootDir) paths on blobs
var err error
if d.useRelPaths {
path, err = filepath.Rel(d.rootDir, path)
if err != nil {
d.log.Error().Err(err).Str("path", path).Msg("failed to get relative path")
}
}
lock := d.rs.NewMutex(d.join(constants.RedisLocksBucket, digest.String()))
err = lock.Lock()
if err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to acquire redis lock")
return err
}
defer func() {
if _, err := lock.Unlock(); err != nil {
d.log.Error().Err(err).Str("digest", digest.String()).Msg("failed to release redis lock")
}
}()
pathSet := d.join(constants.BlobsCache, constants.DuplicatesBucket, digest.String())
// delete path from the set of paths which the digest represents
_, err = d.db.SRem(ctx, pathSet, path).Result()
if err != nil {
d.log.Error().Err(err).Str("srem", pathSet).Str("value", path).Msg("failed to delete record")
return err
}
currentPath, err := d.GetBlob(digest)
if err != nil {
return err
}
if currentPath != path {
// nothing we need to do, return nil yay
return nil
}
// we need to set a new path
newPath, err := d.db.SRandMember(ctx, pathSet).Result()
if err != nil {
if goerrors.Is(err, redis.Nil) {
_, err := d.db.HDel(ctx, d.join(constants.BlobsCache, constants.OriginalBucket), digest.String()).Result()
if err != nil {
return err
}
return nil
}
d.log.Error().Err(err).Str("srandmember", pathSet).Msg("failed to get new path")
return err
}
if _, err := d.db.HSet(ctx, d.join(constants.BlobsCache, constants.OriginalBucket),
digest.String(), newPath).Result(); err != nil {
d.log.Error().Err(err).Str("hset", d.join(constants.BlobsCache, constants.OriginalBucket)).Str("value", newPath).
Msg("unable to put record")
return err
}
return nil
}
+712
View File
@@ -0,0 +1,712 @@
package cache_test
import (
"errors"
"fmt"
"path"
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redismock/v9"
"github.com/redis/go-redis/v9"
. "github.com/smartystreets/goconvey/convey"
zerr "zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/storage"
"zotregistry.dev/zot/pkg/storage/cache"
"zotregistry.dev/zot/pkg/storage/constants"
test "zotregistry.dev/zot/pkg/test/common"
)
var ErrTestError = errors.New("TestError")
func TestRedisCache(t *testing.T) {
miniRedis := miniredis.RunT(t)
Convey("Make a new cache", t, func() {
dir := t.TempDir()
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
cacheDriver, err := storage.Create("redis", "failTypeAssertion", log)
So(cacheDriver, ShouldBeNil)
So(err, ShouldNotBeNil)
connOpts, _ := redis.ParseURL("redis://" + miniRedis.Addr())
client := redis.NewClient(connOpts)
cacheDriver, err = storage.Create("redis",
cache.RedisDriverParameters{client, dir, true, "zot"}, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
name := cacheDriver.Name()
So(name, ShouldEqual, "redis")
val, err := cacheDriver.GetBlob("key")
So(err, ShouldEqual, zerr.ErrCacheMiss)
So(val, ShouldBeEmpty)
exists := cacheDriver.HasBlob("key", path.Join(dir, "value"))
So(exists, ShouldBeFalse)
exists = cacheDriver.HasBlob("key", "value")
So(exists, ShouldBeFalse)
err = cacheDriver.PutBlob("key", path.Join(dir, "value"))
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key", "value")
So(err, ShouldNotBeNil)
exists = cacheDriver.HasBlob("key", path.Join(dir, "value"))
So(exists, ShouldBeTrue)
val, err = cacheDriver.GetBlob("key")
So(err, ShouldBeNil)
So(val, ShouldNotBeEmpty)
err = cacheDriver.DeleteBlob("bogusKey", "bogusValue")
So(err, ShouldEqual, zerr.ErrCacheMiss)
err = cacheDriver.DeleteBlob("key", "bogusValue")
So(err, ShouldBeNil)
// try to insert empty path
err = cacheDriver.PutBlob("key", "")
So(err, ShouldNotBeNil)
So(err, ShouldEqual, zerr.ErrEmptyValue)
connOpts, _ = redis.ParseURL("redis://" + miniRedis.Addr() + "/5")
client = redis.NewClient(connOpts)
cacheDriver, err = storage.Create("redis",
cache.RedisDriverParameters{client, t.TempDir(), false, "zot"}, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)
val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)
val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "originalBlobPath")
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)
err = cacheDriver.DeleteBlob("key1", "originalBlobPath")
So(err, ShouldBeNil)
val, err = cacheDriver.GetBlob("key1")
So(val, ShouldEqual, "duplicateBlobPath")
So(err, ShouldBeNil)
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
So(err, ShouldBeNil)
// should be empty
val, err = cacheDriver.GetBlob("key1")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)
// try to add three same values
err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("key2", "duplicate")
So(err, ShouldBeNil)
val, err = cacheDriver.GetBlob("key2")
So(val, ShouldEqual, "duplicate")
So(err, ShouldBeNil)
err = cacheDriver.DeleteBlob("key2", "duplicate")
So(err, ShouldBeNil)
// should be empty
val, err = cacheDriver.GetBlob("key2")
So(err, ShouldNotBeNil)
So(val, ShouldBeEmpty)
})
Convey("Test cache.GetAllBlos()", t, func() {
dir := t.TempDir()
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
connOpts, _ := redis.ParseURL("redis://" + miniRedis.Addr())
client := redis.NewClient(connOpts)
cacheDriver, err := storage.Create("redis",
cache.RedisDriverParameters{client, dir, true, "zot"}, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
name := cacheDriver.Name()
So(name, ShouldEqual, "redis")
blobs, err := cacheDriver.GetAllBlobs("digest")
So(err, ShouldEqual, zerr.ErrCacheMiss)
So(blobs, ShouldBeNil)
err = cacheDriver.PutBlob("digest", path.Join(dir, "first"))
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("digest", path.Join(dir, "second"))
So(err, ShouldBeNil)
err = cacheDriver.PutBlob("digest", path.Join(dir, "third"))
So(err, ShouldBeNil)
blobs, err = cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)
So(blobs, ShouldResemble, []string{"first", "second", "third"})
err = cacheDriver.DeleteBlob("digest", path.Join(dir, "first"))
So(err, ShouldBeNil)
blobs, err = cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)
So(len(blobs), ShouldEqual, 2)
So(blobs, ShouldContain, "second")
So(blobs, ShouldContain, "third")
err = cacheDriver.DeleteBlob("digest", path.Join(dir, "third"))
So(err, ShouldBeNil)
blobs, err = cacheDriver.GetAllBlobs("digest")
So(err, ShouldBeNil)
So(blobs, ShouldResemble, []string{"second"})
})
}
func TestRedisCacheError(t *testing.T) {
Convey("Make a new cache", t, func() {
dir := t.TempDir()
redisURL := "redis://127.0.0.1:" + test.GetFreePort()
connOpts, _ := redis.ParseURL(redisURL)
brokenClient := redis.NewClient(connOpts)
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
// redis server is not running
cacheDriver, err := storage.Create("redis",
cache.RedisDriverParameters{brokenClient, dir, true, "zot"}, log)
So(err, ShouldNotBeNil)
So(cacheDriver, ShouldBeNil)
})
Convey("Redis unreachable", t, func() {
miniRedis := miniredis.RunT(t)
dir := t.TempDir()
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
connOpts, _ := redis.ParseURL("redis://" + miniRedis.Addr())
workingClient := redis.NewClient(connOpts)
redisURL := "redis://127.0.0.1:" + test.GetFreePort() // must not match miniRedis.Addr()
connOpts, _ = redis.ParseURL(redisURL)
brokenClient := redis.NewClient(connOpts)
cacheDriver, err := cache.NewRedisCache(
cache.RedisDriverParameters{workingClient, dir, false, "zot"}, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
// replace the working driver with the broken one
cacheDriver.SetClient(brokenClient)
err = cacheDriver.PutBlob("key", "val")
So(err, ShouldNotBeNil)
found := cacheDriver.HasBlob("key", "val")
So(found, ShouldEqual, false)
_, err = cacheDriver.GetBlob("key")
So(err, ShouldNotBeNil)
_, err = cacheDriver.GetAllBlobs("key")
So(err, ShouldNotBeNil)
err = cacheDriver.DeleteBlob("key", "val")
So(err, ShouldNotBeNil)
})
}
func TestRedisMocked(t *testing.T) {
Convey("Redis tests using mocks", t, func() {
dir := t.TempDir()
log := log.NewLogger("debug", "")
So(log, ShouldNotBeNil)
tests := []cache.RedisDriverParameters{
{
RootDir: dir,
UseRelPaths: true,
}, {
RootDir: dir,
UseRelPaths: false,
}, {
RootDir: dir,
UseRelPaths: true,
KeyPrefix: "someprefix",
}, {
RootDir: dir,
UseRelPaths: true,
KeyPrefix: "zot",
},
}
for i, redisDriverParams := range tests {
testID := fmt.Sprintf(" %d", i)
keyPrefix := redisDriverParams.KeyPrefix
if len(keyPrefix) == 0 {
// check default
keyPrefix = "zot"
}
keyPrefix += ":"
// depending on UseRelPaths value we check the relative or absolute value
// in results using path.Join(pathPrefix, path) in both cases
pathPrefix := ""
if !redisDriverParams.UseRelPaths {
pathPrefix = redisDriverParams.RootDir
}
Convey("PutBlob HExists error"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetErr(ErrTestError)
err = cacheDriver.PutBlob("key", path.Join(dir, "val"))
So(err, ShouldEqual, ErrTestError)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("PutBlob HSet error"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val")).SetErr(ErrTestError)
err = cacheDriver.PutBlob("key", path.Join(dir, "val"))
So(err, ShouldEqual, ErrTestError)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("PutBlob SAdd error"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetErr(ErrTestError)
err = cacheDriver.PutBlob("key", path.Join(dir, "val"))
So(err, ShouldEqual, ErrTestError)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("PutBlob succeeds original bucket is created"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val"))
So(err, ShouldBeNil)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("PutBlob succeeds original bucket is reused"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(true)
mock.ExpectTxPipeline()
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val"))
So(err, ShouldBeNil)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("SMembers error in GetAllBlobs"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val"))
mock.ExpectSMembers(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
SetErr(ErrTestError)
_, err = cacheDriver.GetAllBlobs("key")
So(err, ShouldEqual, ErrTestError)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("GetAllBlobs succeeds"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val1"))
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val2"))
So(err, ShouldBeNil)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
mock.ExpectSMembers(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
SetVal([]string{path.Join(pathPrefix, "val1"), path.Join(pathPrefix, "val2")})
allBlobs, err := cacheDriver.GetAllBlobs("key")
So(err, ShouldBeNil)
So(allBlobs, ShouldResemble, []string{path.Join(pathPrefix, "val1"), path.Join(pathPrefix, "val2")})
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("HasBlob HExists returns error"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.ExpectSIsMember(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetErr(ErrTestError)
ok := cacheDriver.HasBlob("key", path.Join(dir, "val"))
So(ok, ShouldBeFalse)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("HasBlob SIsMember returns error"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.ExpectSIsMember(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetErr(ErrTestError)
ok := cacheDriver.HasBlob("key", path.Join(dir, "val"))
So(ok, ShouldBeFalse)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("HasBlob HExists returns false"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
mock.ExpectSIsMember(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val")).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
ok := cacheDriver.HasBlob("key", path.Join(dir, "val"))
So(ok, ShouldBeFalse)
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
Convey("DeleteBlob tests"+testID, func() {
// initialize mock client
cacheDB, mock := redismock.NewClientMock()
redisDriverParams.Client = cacheDB
mock.ExpectPing().SetVal("OK")
cacheDriver, err := cache.NewRedisCache(redisDriverParams, log)
So(cacheDriver, ShouldNotBeNil)
So(err, ShouldBeNil)
// Create entry for 1st path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val1"))
So(err, ShouldBeNil)
Convey("DeleteBlob error in HDel"+testID, func() {
// If the 2nd path does not exist, HDel is callled
// Error switching to new path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectSRem(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
// failed to get new path
mock.ExpectSRandMember(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
RedisNil()
mock.ExpectHDel(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetErr(ErrTestError)
err = cacheDriver.DeleteBlob("key", path.Join(dir, "val1"))
So(err, ShouldEqual, ErrTestError)
})
Convey("DeleteBlob succeeds in deleting all data for original blob"+testID, func() {
// If the 2nd path does not exist, HDel is callled
// Error switching to new path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectSRem(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
// failed to get new path
mock.ExpectSRandMember(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
RedisNil()
mock.ExpectHDel(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(1)
err = cacheDriver.DeleteBlob("key", path.Join(dir, "val1"))
So(err, ShouldBeNil)
})
Convey("DeleteBlob error in SRandMember"+testID, func() {
// Create entry for 2nd path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val2"))
So(err, ShouldBeNil)
// Error switching to new path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectSRem(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
// failed to get new path
mock.ExpectSRandMember(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
SetErr(ErrTestError)
err = cacheDriver.DeleteBlob("key", path.Join(dir, "val1"))
So(err, ShouldEqual, ErrTestError)
})
Convey("DeleteBlob error in HSet"+testID, func() {
// Create entry for 2nd path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val2"))
So(err, ShouldBeNil)
// Error switching to new path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectSRem(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
mock.ExpectSRandMember(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
SetVal(path.Join(pathPrefix, "val2"))
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetErr(ErrTestError)
err = cacheDriver.DeleteBlob("key", path.Join(dir, "val1"))
So(err, ShouldEqual, ErrTestError)
})
Convey("DeleteBlob succeeds in switching original blob path"+testID, func() {
// Create entry for 2nd path
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectHExists(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(false)
mock.ExpectTxPipeline()
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectSAdd(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val2")).SetVal(1)
mock.ExpectTxPipelineExec()
err = cacheDriver.PutBlob("key", path.Join(dir, "val2"))
So(err, ShouldBeNil)
mock.Regexp().ExpectSetNX(keyPrefix+"locks:key", `.*`, 8*time.Second).SetVal(true)
mock.ExpectSRem(keyPrefix+constants.BlobsCache+":"+constants.DuplicatesBucket+":key",
path.Join(pathPrefix, "val1")).SetVal(1)
mock.ExpectHGet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key").
SetVal(path.Join(pathPrefix, "val1"))
mock.ExpectSRandMember(keyPrefix + constants.BlobsCache + ":" + constants.DuplicatesBucket + ":key").
SetVal(path.Join(pathPrefix, "val2"))
mock.ExpectHSet(keyPrefix+constants.BlobsCache+":"+constants.OriginalBucket, "key",
path.Join(pathPrefix, "val2")).SetVal(1)
err = cacheDriver.DeleteBlob("key", path.Join(dir, "val1"))
So(err, ShouldBeNil)
})
err = mock.ExpectationsWereMet()
So(err, ShouldBeNil)
})
}
})
}