mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 04:48:26 +08:00
feat(cache): dynamodb implementation (#953)
Signed-off-by: Catalin Hofnar <catalin.hofnar@gmail.com>
This commit is contained in:
committed by
GitHub
parent
49c3d05706
commit
31b9481713
Vendored
+9
-8
@@ -32,18 +32,14 @@ func NewBoltDBCache(parameters interface{}, log zlog.Logger) Cache {
|
||||
panic("Failed type assertion")
|
||||
}
|
||||
|
||||
return NewCache(properParameters, log)
|
||||
}
|
||||
|
||||
func NewCache(parameters BoltDBDriverParameters, log zlog.Logger) *BoltDBDriver {
|
||||
err := os.MkdirAll(parameters.RootDir, constants.DefaultDirPerms)
|
||||
err := os.MkdirAll(properParameters.RootDir, constants.DefaultDirPerms)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("unable to create directory for cache db: %v", parameters.RootDir)
|
||||
log.Error().Err(err).Msgf("unable to create directory for cache db: %v", properParameters.RootDir)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
dbPath := path.Join(parameters.RootDir, parameters.Name+constants.DBExtensionName)
|
||||
dbPath := path.Join(properParameters.RootDir, properParameters.Name+constants.DBExtensionName)
|
||||
dbOpts := &bbolt.Options{
|
||||
Timeout: constants.DBCacheLockCheckTimeout,
|
||||
FreelistType: bbolt.FreelistArrayType,
|
||||
@@ -72,7 +68,12 @@ func NewCache(parameters BoltDBDriverParameters, log zlog.Logger) *BoltDBDriver
|
||||
return nil
|
||||
}
|
||||
|
||||
return &BoltDBDriver{rootDir: parameters.RootDir, db: cacheDB, useRelPaths: parameters.UseRelPaths, log: log}
|
||||
return &BoltDBDriver{
|
||||
rootDir: properParameters.RootDir,
|
||||
db: cacheDB,
|
||||
useRelPaths: properParameters.UseRelPaths,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *BoltDBDriver) Name() string {
|
||||
|
||||
Vendored
+216
@@ -0,0 +1,216 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
|
||||
zerr "zotregistry.io/zot/errors"
|
||||
zlog "zotregistry.io/zot/pkg/log"
|
||||
)
|
||||
|
||||
type DynamoDBDriver struct {
|
||||
client *dynamodb.Client
|
||||
log zlog.Logger
|
||||
tableName string
|
||||
}
|
||||
|
||||
type DynamoDBDriverParameters struct {
|
||||
Endpoint, Region, TableName string
|
||||
}
|
||||
|
||||
type Blob struct {
|
||||
Digest string `dynamodbav:"Digest,string"`
|
||||
BlobPath []string `dynamodbav:"BlobPath,stringset"`
|
||||
}
|
||||
|
||||
// Use ONLY for tests.
|
||||
func (d *DynamoDBDriver) NewTable(tableName string) error {
|
||||
//nolint:gomnd
|
||||
_, err := d.client.CreateTable(context.TODO(), &dynamodb.CreateTableInput{
|
||||
TableName: &tableName,
|
||||
AttributeDefinitions: []types.AttributeDefinition{
|
||||
{
|
||||
AttributeName: aws.String("Digest"),
|
||||
AttributeType: types.ScalarAttributeTypeS,
|
||||
},
|
||||
},
|
||||
KeySchema: []types.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String("Digest"),
|
||||
KeyType: types.KeyTypeHash,
|
||||
},
|
||||
},
|
||||
ProvisionedThroughput: &types.ProvisionedThroughput{
|
||||
ReadCapacityUnits: aws.Int64(10),
|
||||
WriteCapacityUnits: aws.Int64(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.tableName = tableName
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDynamoDBCache(parameters interface{}, log zlog.Logger) Cache {
|
||||
properParameters, ok := parameters.(DynamoDBDriverParameters)
|
||||
if !ok {
|
||||
panic("Failed type assertion!")
|
||||
}
|
||||
|
||||
// custom endpoint resolver to point to localhost
|
||||
customResolver := aws.EndpointResolverWithOptionsFunc(
|
||||
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: properParameters.Endpoint,
|
||||
SigningRegion: region,
|
||||
}, nil
|
||||
})
|
||||
|
||||
// Using the SDK's default configuration, loading additional config
|
||||
// and credentials values from the environment variables, shared
|
||||
// credentials, and shared configuration files
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(properParameters.Region),
|
||||
config.WithEndpointResolverWithOptions(customResolver))
|
||||
if err != nil {
|
||||
log.Error().Msgf("unable to load AWS SDK config for dynamodb, %v", err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Using the Config value, create the DynamoDB client
|
||||
return &DynamoDBDriver{client: dynamodb.NewFromConfig(cfg), tableName: properParameters.TableName, log: log}
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) Name() string {
|
||||
return "dynamodb"
|
||||
}
|
||||
|
||||
// Returns the first path of the blob if it exists.
|
||||
func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) {
|
||||
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
|
||||
TableName: aws.String(d.tableName),
|
||||
Key: map[string]types.AttributeValue{
|
||||
"Digest": &types.AttributeValueMemberS{Value: digest.String()},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
d.log.Error().Msgf("failed to get blob %v, %v", d.tableName, err)
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
out := Blob{}
|
||||
|
||||
if resp.Item == nil {
|
||||
return "", zerr.ErrCacheMiss
|
||||
}
|
||||
|
||||
_ = attributevalue.UnmarshalMap(resp.Item, &out)
|
||||
|
||||
if len(out.BlobPath) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return out.BlobPath[0], nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
|
||||
if path == "" {
|
||||
d.log.Error().Err(zerr.ErrEmptyValue).Str("digest", digest.String()).Msg("empty path provided")
|
||||
|
||||
return zerr.ErrEmptyValue
|
||||
}
|
||||
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
|
||||
expression := "ADD BlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
if _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
|
||||
}); err != nil {
|
||||
d.log.Error().Err(err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool {
|
||||
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
|
||||
TableName: aws.String(d.tableName),
|
||||
Key: map[string]types.AttributeValue{
|
||||
"Digest": &types.AttributeValueMemberS{Value: digest.String()},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
d.log.Error().Msgf("failed to get blob %v, %v", d.tableName, err)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
out := Blob{}
|
||||
|
||||
if resp.Item == nil {
|
||||
d.log.Error().Err(zerr.ErrCacheMiss)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
_ = attributevalue.UnmarshalMap(resp.Item, &out)
|
||||
|
||||
for _, item := range out.BlobPath {
|
||||
if item == path {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
d.log.Error().Err(zerr.ErrCacheMiss)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
|
||||
|
||||
expression := "DELETE BlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
_, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
|
||||
})
|
||||
if err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
result, _ := d.GetBlob(digest)
|
||||
|
||||
if result == "" {
|
||||
d.log.Debug().Str("digest", digest.String()).Str("path", path).Msg("deleting empty bucket")
|
||||
|
||||
_, _ = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Vendored
+111
@@ -0,0 +1,111 @@
|
||||
package cache_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/storage"
|
||||
"zotregistry.io/zot/pkg/storage/cache"
|
||||
)
|
||||
|
||||
func skipIt(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
if os.Getenv("DYNAMODBMOCK_ENDPOINT") == "" {
|
||||
t.Skip("Skipping testing without AWS DynamoDB mock server")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDynamoDB(t *testing.T) {
|
||||
skipIt(t)
|
||||
Convey("Test dynamoDB", t, func(c C) {
|
||||
log := log.NewLogger("debug", "")
|
||||
dir := t.TempDir()
|
||||
|
||||
// bad params
|
||||
|
||||
So(func() {
|
||||
_ = cache.NewDynamoDBCache("bad params", log)
|
||||
}, ShouldPanic)
|
||||
|
||||
keyDigest := godigest.FromString("key")
|
||||
|
||||
cacheDriver, err := storage.Create("dynamodb", cache.DynamoDBDriverParameters{
|
||||
Endpoint: "http://brokenlink",
|
||||
TableName: "BlobTable",
|
||||
Region: "us-east-2",
|
||||
}, log)
|
||||
So(cacheDriver, ShouldNotBeNil)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err := cacheDriver.GetBlob(keyDigest)
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
|
||||
err = cacheDriver.PutBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
exists := cacheDriver.HasBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(exists, ShouldBeFalse)
|
||||
|
||||
err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
cacheDriver, err = storage.Create("dynamodb", cache.DynamoDBDriverParameters{
|
||||
Endpoint: os.Getenv("DYNAMODBMOCK_ENDPOINT"),
|
||||
TableName: "BlobTable",
|
||||
Region: "us-east-2",
|
||||
}, log)
|
||||
So(cacheDriver, ShouldNotBeNil)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
returnedName := cacheDriver.Name()
|
||||
So(returnedName, ShouldEqual, "dynamodb")
|
||||
|
||||
val, err = cacheDriver.GetBlob(keyDigest)
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
|
||||
err = cacheDriver.PutBlob(keyDigest, "")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob(keyDigest)
|
||||
So(err, ShouldBeNil)
|
||||
So(val, ShouldNotBeEmpty)
|
||||
|
||||
exists = cacheDriver.HasBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(exists, ShouldBeTrue)
|
||||
|
||||
err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
exists = cacheDriver.HasBlob(keyDigest, path.Join(dir, "value"))
|
||||
So(exists, ShouldBeFalse)
|
||||
|
||||
err = cacheDriver.PutBlob(keyDigest, path.Join(dir, "value1"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob(keyDigest, path.Join(dir, "value2"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value1"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
exists = cacheDriver.HasBlob(keyDigest, path.Join(dir, "value2"))
|
||||
So(exists, ShouldBeTrue)
|
||||
|
||||
exists = cacheDriver.HasBlob(keyDigest, path.Join(dir, "value1"))
|
||||
So(exists, ShouldBeFalse)
|
||||
|
||||
err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value2"))
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user