diff --git a/pkg/meta/dynamodb/dynamodb.go b/pkg/meta/dynamodb/dynamodb.go index 3b2f3b81..7151a4ae 100644 --- a/pkg/meta/dynamodb/dynamodb.go +++ b/pkg/meta/dynamodb/dynamodb.go @@ -63,27 +63,27 @@ func New(client *dynamodb.Client, params DBDriverParameters, log log.Logger, return nil, err } - err = dynamoWrapper.createTable(dynamoWrapper.RepoMetaTablename) + err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.RepoMetaTablename) if err != nil { return nil, err } - err = dynamoWrapper.createTable(dynamoWrapper.RepoBlobsTablename) + err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.RepoBlobsTablename) if err != nil { return nil, err } - err = dynamoWrapper.createTable(dynamoWrapper.ImageMetaTablename) + err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.ImageMetaTablename) if err != nil { return nil, err } - err = dynamoWrapper.createTable(dynamoWrapper.UserDataTablename) + err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.UserDataTablename) if err != nil { return nil, err } - err = dynamoWrapper.createTable(dynamoWrapper.APIKeyTablename) + err = dynamoWrapper.createTableIfNotExists(dynamoWrapper.APIKeyTablename) if err != nil { return nil, err } @@ -2266,6 +2266,48 @@ func (dwr *DynamoDB) ResetTable(tableName string) error { return dwr.createTable(tableName) } +func (dwr *DynamoDB) tableExists(tableName string) (bool, error) { + _, err := dwr.Client.DescribeTable(context.Background(), &dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + }) + if err == nil { + return true, nil + } + + var notFoundErr *types.ResourceNotFoundException + if errors.As(err, ¬FoundErr) { + return false, nil + } + + return false, err +} + +func (dwr *DynamoDB) createTableIfNotExists(tableName string) error { + exists, err := dwr.tableExists(tableName) + if err != nil { + return err + } + + if exists { + return nil + } + + return dwr.createTable(tableName) +} + +func ignoreResourceInUseError(err error) error { + if err == nil { + return nil + } + + var inUseException *types.ResourceInUseException + if errors.As(err, &inUseException) { + return nil + } + + return err +} + func (dwr *DynamoDB) createTable(tableName string) error { _, err := dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{ TableName: aws.String(tableName), @@ -2283,11 +2325,8 @@ func (dwr *DynamoDB) createTable(tableName string) error { }, BillingMode: types.BillingModePayPerRequest, }) - if err != nil { - inUseException := new(types.ResourceInUseException) - if !errors.As(err, &inUseException) { - return err - } + if err = ignoreResourceInUseError(err); err != nil { + return err } return dwr.waitTableToBeCreated(tableName) @@ -2326,28 +2365,31 @@ func (dwr *DynamoDB) waitTableToBeDeleted(tableName string) error { } func (dwr *DynamoDB) createVersionTable() error { - _, err := dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{ - TableName: aws.String(dwr.VersionTablename), - AttributeDefinitions: []types.AttributeDefinition{ - { - AttributeName: aws.String("TableKey"), - AttributeType: types.ScalarAttributeTypeS, - }, - }, - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("TableKey"), - KeyType: types.KeyTypeHash, - }, - }, - BillingMode: types.BillingModePayPerRequest, - }) + exists, err := dwr.tableExists(dwr.VersionTablename) if err != nil { - inUseException := new(types.ResourceInUseException) - if !errors.As(err, &inUseException) { + return err + } + + if !exists { + _, err = dwr.Client.CreateTable(context.Background(), &dynamodb.CreateTableInput{ + TableName: aws.String(dwr.VersionTablename), + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("TableKey"), + AttributeType: types.ScalarAttributeTypeS, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("TableKey"), + KeyType: types.KeyTypeHash, + }, + }, + BillingMode: types.BillingModePayPerRequest, + }) + if err = ignoreResourceInUseError(err); err != nil { return err } - // Table already exists, continue to ensure version data is set } err = dwr.waitTableToBeCreated(dwr.VersionTablename) diff --git a/pkg/meta/dynamodb/dynamodb_internal_test.go b/pkg/meta/dynamodb/dynamodb_internal_test.go index 83672b2e..9167e00f 100644 --- a/pkg/meta/dynamodb/dynamodb_internal_test.go +++ b/pkg/meta/dynamodb/dynamodb_internal_test.go @@ -1,8 +1,13 @@ package dynamodb import ( + "bytes" "context" + "errors" + "io" + "net/http" "os" + "sync" "testing" "github.com/aws/aws-sdk-go-v2/aws" @@ -116,6 +121,54 @@ func TestWrapperErrors(t *testing.T) { So(actualVersion, ShouldEqual, version.CurrentVersion) }) + Convey("New sets version when version table already exists but version doesn't", func() { + uuid, err := guuid.NewV4() + So(err, ShouldBeNil) + + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String(endpoint) + }) + + params := DBDriverParameters{ + RepoMetaTablename: "RepoMetadataTable" + uuid.String(), + RepoBlobsInfoTablename: "RepoBlobsTable" + uuid.String(), + ImageMetaTablename: "ImageMetaTable" + uuid.String(), + UserDataTablename: "UserDataTable" + uuid.String(), + APIKeyTablename: "ApiKeyTable" + uuid.String(), + VersionTablename: "Version" + uuid.String(), + } + + dynamoWrapper := DynamoDB{ + Client: client, + VersionTablename: params.VersionTablename, + Patches: version.GetDynamoDBPatches(), + Log: log.NewTestLogger(), + } + + err = dynamoWrapper.createTable(params.VersionTablename) + So(err, ShouldBeNil) + + defer func() { + for _, tableName := range []string{ + params.RepoMetaTablename, + params.RepoBlobsInfoTablename, + params.ImageMetaTablename, + params.UserDataTablename, + params.APIKeyTablename, + params.VersionTablename, + } { + _ = dynamoWrapper.deleteTable(tableName) + } + }() + + _, err = New(client, params, log.NewTestLogger()) + So(err, ShouldBeNil) + + actualVersion, err := getVersion(client, params.VersionTablename) + So(err, ShouldBeNil) + So(actualVersion, ShouldEqual, version.CurrentVersion) + }) + Convey("createVersionTable sets version when table exists but version doesn't", func() { uuid, err := guuid.NewV4() So(err, ShouldBeNil) @@ -187,6 +240,46 @@ func TestWrapperErrors(t *testing.T) { So(actualVersion, ShouldEqual, "V2") }) + Convey("createVersionTable tolerates concurrent CreateTable", func() { + uuid, err := guuid.NewV4() + So(err, ShouldBeNil) + versionTablename := "Version" + uuid.String() + + dynamoWrapper := DynamoDB{ + Client: dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String(endpoint) + }), + VersionTablename: versionTablename, + Patches: version.GetDynamoDBPatches(), + Log: log.NewTestLogger(), + } + + var wg sync.WaitGroup + + errs := make(chan error, 2) + + for range 2 { + wg.Go(func() { + errs <- dynamoWrapper.createVersionTable() + }) + } + + wg.Wait() + close(errs) + + for err := range errs { + So(err, ShouldBeNil) + } + + defer func() { + _ = dynamoWrapper.deleteTable(versionTablename) + }() + + actualVersion, err := getVersion(dynamoWrapper.Client, versionTablename) + So(err, ShouldBeNil) + So(actualVersion, ShouldEqual, version.CurrentVersion) + }) + Convey("createVersionTable is idempotent - can be called multiple times", func() { uuid, err := guuid.NewV4() So(err, ShouldBeNil) @@ -220,6 +313,142 @@ func TestWrapperErrors(t *testing.T) { So(actualVersion, ShouldEqual, version.CurrentVersion) }) }) + + Convey("createTableIfNotExists", t, func() { + cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(region)) + So(err, ShouldBeNil) + + uuid, err := guuid.NewV4() + So(err, ShouldBeNil) + tableName := "RepoMetadataTable" + uuid.String() + + dynamoWrapper := DynamoDB{ + Client: dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String(endpoint) + }), + Log: log.NewTestLogger(), + } + + err = dynamoWrapper.createTable(tableName) + So(err, ShouldBeNil) + defer func() { + _ = dynamoWrapper.deleteTable(tableName) + }() + + err = dynamoWrapper.createTableIfNotExists(tableName) + So(err, ShouldBeNil) + }) +} + +func TestIgnoreResourceInUseError(t *testing.T) { + Convey("ignoreResourceInUseError", t, func() { + So(ignoreResourceInUseError(nil), ShouldBeNil) + + inUseErr := &types.ResourceInUseException{Message: aws.String("table exists")} + So(ignoreResourceInUseError(inUseErr), ShouldBeNil) + + otherErr := errors.New("create table failed") + So(ignoreResourceInUseError(otherErr), ShouldEqual, otherErr) + }) +} + +func TestCreateVersionTableCreateErrors(t *testing.T) { + const ( + region = "us-east-2" + versionTablename = "VersionTest" + describeTableOp = "DynamoDB_20120810.DescribeTable" + createTableOp = "DynamoDB_20120810.CreateTable" + updateItemOp = "DynamoDB_20120810.UpdateItem" + resourceNotFound = `{"__type":"com.amazon.coral.service#ResourceNotFoundException","message":"not found"}` + resourceInUse = `{"__type":"com.amazon.coral.service#ResourceInUseException","message":"already exists"}` + internalError = `{"__type":"com.amazon.coral.service#InternalServerError","message":"boom"}` + activeTable = `{"Table":{"TableName":"VersionTest","TableStatus":"ACTIVE"}}` + ) + + newTestClient := func(handler func(target string) (int, string)) *dynamodb.Client { + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region), + config.WithCredentialsProvider(aws.AnonymousCredentials{}), + config.WithHTTPClient(&http.Client{ + Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { + target := req.Header.Get("X-Amz-Target") + status, body := handler(target) + + return &http.Response{ + StatusCode: status, + Body: io.NopCloser(bytes.NewBufferString(body)), + Header: http.Header{"Content-Type": []string{"application/x-amz-json-1.0"}}, + Request: req, + }, nil + }), + }), + ) + So(err, ShouldBeNil) + + return dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://dynamodb.test") + }) + } + + Convey("createVersionTable propagates non-ResourceInUse CreateTable errors", t, func() { + client := newTestClient(func(target string) (int, string) { + switch target { + case describeTableOp: + return http.StatusBadRequest, resourceNotFound + case createTableOp: + return http.StatusInternalServerError, internalError + default: + return http.StatusInternalServerError, internalError + } + }) + + dynamoWrapper := DynamoDB{ + Client: client, + VersionTablename: versionTablename, + Log: log.NewTestLogger(), + } + + err := dynamoWrapper.createVersionTable() + So(err, ShouldNotBeNil) + }) + + Convey("createVersionTable tolerates ResourceInUseException from CreateTable", t, func() { + describeCount := 0 + + client := newTestClient(func(target string) (int, string) { + switch target { + case describeTableOp: + describeCount++ + + if describeCount == 1 { + return http.StatusBadRequest, resourceNotFound + } + + return http.StatusOK, activeTable + case createTableOp: + return http.StatusBadRequest, resourceInUse + case updateItemOp: + return http.StatusOK, `{}` + default: + return http.StatusInternalServerError, internalError + } + }) + + dynamoWrapper := DynamoDB{ + Client: client, + VersionTablename: versionTablename, + Log: log.NewTestLogger(), + } + + err := dynamoWrapper.createVersionTable() + So(err, ShouldBeNil) + }) +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) } // Helper function to get version from DynamoDB