mirror of
https://github.com/project-zot/zot.git
synced 2026-06-15 11:37:56 +08:00
fix: skip DynamoDB table creation when tables exist (#4120)
Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
This commit is contained in:
@@ -63,27 +63,27 @@ func New(client *dynamodb.Client, params DBDriverParameters, log log.Logger,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(dynamoWrapper.RepoMetaTablename)
|
||||
err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.RepoMetaTablename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(dynamoWrapper.RepoBlobsTablename)
|
||||
err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.RepoBlobsTablename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(dynamoWrapper.ImageMetaTablename)
|
||||
err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.ImageMetaTablename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(dynamoWrapper.UserDataTablename)
|
||||
err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.UserDataTablename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(dynamoWrapper.APIKeyTablename)
|
||||
err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.APIKeyTablename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2266,6 +2266,48 @@ func (dwr *DynamoDB) ResetTable(tableName string) error {
|
||||
return dwr.createTable(tableName)
|
||||
}
|
||||
|
||||
func (dwr *DynamoDB) tableExists(tableName string) (bool, error) {
|
||||
_, err := dwr.Client.DescribeTable(context.Background(), &dynamodb.DescribeTableInput{
|
||||
TableName: aws.String(tableName),
|
||||
})
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var notFoundErr *types.ResourceNotFoundException
|
||||
if errors.As(err, ¬FoundErr) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (dwr *DynamoDB) createTableIfNotExists(tableName string) error {
|
||||
exists, err := dwr.tableExists(tableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return dwr.createTable(tableName)
|
||||
}
|
||||
|
||||
func ignoreResourceInUseError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var inUseException *types.ResourceInUseException
|
||||
if errors.As(err, &inUseException) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (dwr *DynamoDB) createTable(tableName string) error {
|
||||
_, err := dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{
|
||||
TableName: aws.String(tableName),
|
||||
@@ -2283,11 +2325,8 @@ func (dwr *DynamoDB) createTable(tableName string) error {
|
||||
},
|
||||
BillingMode: types.BillingModePayPerRequest,
|
||||
})
|
||||
if err != nil {
|
||||
inUseException := new(types.ResourceInUseException)
|
||||
if !errors.As(err, &inUseException) {
|
||||
return err
|
||||
}
|
||||
if err = ignoreResourceInUseError(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dwr.waitTableToBeCreated(tableName)
|
||||
@@ -2326,28 +2365,31 @@ func (dwr *DynamoDB) waitTableToBeDeleted(tableName string) error {
|
||||
}
|
||||
|
||||
func (dwr *DynamoDB) createVersionTable() error {
|
||||
_, err := dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{
|
||||
TableName: aws.String(dwr.VersionTablename),
|
||||
AttributeDefinitions: []types.AttributeDefinition{
|
||||
{
|
||||
AttributeName: aws.String("TableKey"),
|
||||
AttributeType: types.ScalarAttributeTypeS,
|
||||
},
|
||||
},
|
||||
KeySchema: []types.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String("TableKey"),
|
||||
KeyType: types.KeyTypeHash,
|
||||
},
|
||||
},
|
||||
BillingMode: types.BillingModePayPerRequest,
|
||||
})
|
||||
exists, err := dwr.tableExists(dwr.VersionTablename)
|
||||
if err != nil {
|
||||
inUseException := new(types.ResourceInUseException)
|
||||
if !errors.As(err, &inUseException) {
|
||||
return err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
_, err = dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{
|
||||
TableName: aws.String(dwr.VersionTablename),
|
||||
AttributeDefinitions: []types.AttributeDefinition{
|
||||
{
|
||||
AttributeName: aws.String("TableKey"),
|
||||
AttributeType: types.ScalarAttributeTypeS,
|
||||
},
|
||||
},
|
||||
KeySchema: []types.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String("TableKey"),
|
||||
KeyType: types.KeyTypeHash,
|
||||
},
|
||||
},
|
||||
BillingMode: types.BillingModePayPerRequest,
|
||||
})
|
||||
if err = ignoreResourceInUseError(err); err != nil {
|
||||
return err
|
||||
}
|
||||
// Table already exists, continue to ensure version data is set
|
||||
}
|
||||
|
||||
err = dwr.waitTableToBeCreated(dwr.VersionTablename)
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
package dynamodb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
@@ -116,6 +121,54 @@ func TestWrapperErrors(t *testing.T) {
|
||||
So(actualVersion, ShouldEqual, version.CurrentVersion)
|
||||
})
|
||||
|
||||
Convey("New sets version when version table already exists but version doesn't", func() {
|
||||
uuid, err := guuid.NewV4()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
})
|
||||
|
||||
params := DBDriverParameters{
|
||||
RepoMetaTablename: "RepoMetadataTable" + uuid.String(),
|
||||
RepoBlobsInfoTablename: "RepoBlobsTable" + uuid.String(),
|
||||
ImageMetaTablename: "ImageMetaTable" + uuid.String(),
|
||||
UserDataTablename: "UserDataTable" + uuid.String(),
|
||||
APIKeyTablename: "ApiKeyTable" + uuid.String(),
|
||||
VersionTablename: "Version" + uuid.String(),
|
||||
}
|
||||
|
||||
dynamoWrapper := DynamoDB{
|
||||
Client: client,
|
||||
VersionTablename: params.VersionTablename,
|
||||
Patches: version.GetDynamoDBPatches(),
|
||||
Log: log.NewTestLogger(),
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(params.VersionTablename)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
defer func() {
|
||||
for _, tableName := range []string{
|
||||
params.RepoMetaTablename,
|
||||
params.RepoBlobsInfoTablename,
|
||||
params.ImageMetaTablename,
|
||||
params.UserDataTablename,
|
||||
params.APIKeyTablename,
|
||||
params.VersionTablename,
|
||||
} {
|
||||
_ = dynamoWrapper.deleteTable(tableName)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = New(client, params, log.NewTestLogger())
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
actualVersion, err := getVersion(client, params.VersionTablename)
|
||||
So(err, ShouldBeNil)
|
||||
So(actualVersion, ShouldEqual, version.CurrentVersion)
|
||||
})
|
||||
|
||||
Convey("createVersionTable sets version when table exists but version doesn't", func() {
|
||||
uuid, err := guuid.NewV4()
|
||||
So(err, ShouldBeNil)
|
||||
@@ -187,6 +240,46 @@ func TestWrapperErrors(t *testing.T) {
|
||||
So(actualVersion, ShouldEqual, "V2")
|
||||
})
|
||||
|
||||
Convey("createVersionTable tolerates concurrent CreateTable", func() {
|
||||
uuid, err := guuid.NewV4()
|
||||
So(err, ShouldBeNil)
|
||||
versionTablename := "Version" + uuid.String()
|
||||
|
||||
dynamoWrapper := DynamoDB{
|
||||
Client: dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
}),
|
||||
VersionTablename: versionTablename,
|
||||
Patches: version.GetDynamoDBPatches(),
|
||||
Log: log.NewTestLogger(),
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
errs := make(chan error, 2)
|
||||
|
||||
for range 2 {
|
||||
wg.Go(func() {
|
||||
errs <- dynamoWrapper.createVersionTable()
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
|
||||
for err := range errs {
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = dynamoWrapper.deleteTable(versionTablename)
|
||||
}()
|
||||
|
||||
actualVersion, err := getVersion(dynamoWrapper.Client, versionTablename)
|
||||
So(err, ShouldBeNil)
|
||||
So(actualVersion, ShouldEqual, version.CurrentVersion)
|
||||
})
|
||||
|
||||
Convey("createVersionTable is idempotent - can be called multiple times", func() {
|
||||
uuid, err := guuid.NewV4()
|
||||
So(err, ShouldBeNil)
|
||||
@@ -220,6 +313,142 @@ func TestWrapperErrors(t *testing.T) {
|
||||
So(actualVersion, ShouldEqual, version.CurrentVersion)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("createTableIfNotExists", t, func() {
|
||||
cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(region))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
uuid, err := guuid.NewV4()
|
||||
So(err, ShouldBeNil)
|
||||
tableName := "RepoMetadataTable" + uuid.String()
|
||||
|
||||
dynamoWrapper := DynamoDB{
|
||||
Client: dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
}),
|
||||
Log: log.NewTestLogger(),
|
||||
}
|
||||
|
||||
err = dynamoWrapper.createTable(tableName)
|
||||
So(err, ShouldBeNil)
|
||||
defer func() {
|
||||
_ = dynamoWrapper.deleteTable(tableName)
|
||||
}()
|
||||
|
||||
err = dynamoWrapper.createTableIfNotExists(tableName)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
func TestIgnoreResourceInUseError(t *testing.T) {
|
||||
Convey("ignoreResourceInUseError", t, func() {
|
||||
So(ignoreResourceInUseError(nil), ShouldBeNil)
|
||||
|
||||
inUseErr := &types.ResourceInUseException{Message: aws.String("table exists")}
|
||||
So(ignoreResourceInUseError(inUseErr), ShouldBeNil)
|
||||
|
||||
otherErr := errors.New("create table failed")
|
||||
So(ignoreResourceInUseError(otherErr), ShouldEqual, otherErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCreateVersionTableCreateErrors(t *testing.T) {
|
||||
const (
|
||||
region = "us-east-2"
|
||||
versionTablename = "VersionTest"
|
||||
describeTableOp = "DynamoDB_20120810.DescribeTable"
|
||||
createTableOp = "DynamoDB_20120810.CreateTable"
|
||||
updateItemOp = "DynamoDB_20120810.UpdateItem"
|
||||
resourceNotFound = `{"__type":"com.amazon.coral.service#ResourceNotFoundException","message":"not found"}`
|
||||
resourceInUse = `{"__type":"com.amazon.coral.service#ResourceInUseException","message":"already exists"}`
|
||||
internalError = `{"__type":"com.amazon.coral.service#InternalServerError","message":"boom"}`
|
||||
activeTable = `{"Table":{"TableName":"VersionTest","TableStatus":"ACTIVE"}}`
|
||||
)
|
||||
|
||||
newTestClient := func(handler func(target string) (int, string)) *dynamodb.Client {
|
||||
cfg, err := config.LoadDefaultConfig(context.Background(),
|
||||
config.WithRegion(region),
|
||||
config.WithCredentialsProvider(aws.AnonymousCredentials{}),
|
||||
config.WithHTTPClient(&http.Client{
|
||||
Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
target := req.Header.Get("X-Amz-Target")
|
||||
status, body := handler(target)
|
||||
|
||||
return &http.Response{
|
||||
StatusCode: status,
|
||||
Body: io.NopCloser(bytes.NewBufferString(body)),
|
||||
Header: http.Header{"Content-Type": []string{"application/x-amz-json-1.0"}},
|
||||
Request: req,
|
||||
}, nil
|
||||
}),
|
||||
}),
|
||||
)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
return dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
|
||||
o.BaseEndpoint = aws.String("http://dynamodb.test")
|
||||
})
|
||||
}
|
||||
|
||||
Convey("createVersionTable propagates non-ResourceInUse CreateTable errors", t, func() {
|
||||
client := newTestClient(func(target string) (int, string) {
|
||||
switch target {
|
||||
case describeTableOp:
|
||||
return http.StatusBadRequest, resourceNotFound
|
||||
case createTableOp:
|
||||
return http.StatusInternalServerError, internalError
|
||||
default:
|
||||
return http.StatusInternalServerError, internalError
|
||||
}
|
||||
})
|
||||
|
||||
dynamoWrapper := DynamoDB{
|
||||
Client: client,
|
||||
VersionTablename: versionTablename,
|
||||
Log: log.NewTestLogger(),
|
||||
}
|
||||
|
||||
err := dynamoWrapper.createVersionTable()
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("createVersionTable tolerates ResourceInUseException from CreateTable", t, func() {
|
||||
describeCount := 0
|
||||
|
||||
client := newTestClient(func(target string) (int, string) {
|
||||
switch target {
|
||||
case describeTableOp:
|
||||
describeCount++
|
||||
|
||||
if describeCount == 1 {
|
||||
return http.StatusBadRequest, resourceNotFound
|
||||
}
|
||||
|
||||
return http.StatusOK, activeTable
|
||||
case createTableOp:
|
||||
return http.StatusBadRequest, resourceInUse
|
||||
case updateItemOp:
|
||||
return http.StatusOK, `{}`
|
||||
default:
|
||||
return http.StatusInternalServerError, internalError
|
||||
}
|
||||
})
|
||||
|
||||
dynamoWrapper := DynamoDB{
|
||||
Client: client,
|
||||
VersionTablename: versionTablename,
|
||||
Log: log.NewTestLogger(),
|
||||
}
|
||||
|
||||
err := dynamoWrapper.createVersionTable()
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return f(req)
|
||||
}
|
||||
|
||||
// Helper function to get version from DynamoDB
|
||||
|
||||
Reference in New Issue
Block a user