在开发无服务器应用的时候,通过纯手工方式创建各种资源的话,效率会非常低,因此需要借助一些框架来提高开发效率。下面介绍如何在serverless中创建并使用AWS DynamoDB。

创建DynamoDB Table
假设我们要针对这个学生分数表来创建DynamoDB的表:
Student ID, Subject, Score, Date
--------------------------------
9901, Math, 90, 2021-11-01
9901, English, 80, 2021-11-02
9902, Math, 92, 2021-11-01
9903, Math, 63, 2021-11-01
9904, Math, 90, 2021-11-01
那么在serverless.yml中定义DynamoDB对应的资源:
service: service-02
provider:
name: aws
region: eu-west-1
runtime: nodejs14.x
lambdaHashingVersion: 20201221
profile: default
resources:
Resources:
MyDynamoDbTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.tableName}
AttributeDefinitions:
- AttributeName: StudentID
AttributeType: S
- AttributeName: Subject
AttributeType: S
- AttributeName: Date
AttributeType: S
KeySchema:
- AttributeName: StudentID
KeyType: HASH
- AttributeName: Subject
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: Dates
KeySchema:
- AttributeName: Date
KeyType: HASH
- AttributeName: Subject
KeyType: RANGE
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
custom:
tableName: score-table
需要注意的是,在KeySchema中,HASH对应的是分区键(Partition Key),RANGE对应的是索引键(SORT KEY/RANGE KEY)。
另外关于GSI定义中的ProjectType,指的是将所有属性从Table复制到Index。
运行如下命令就可以部署到AWS:
sls deploy
使用TTL(Time To Live)
可以使用TTL来指定特定数据的生命期。比如:
TimeToLiveSpecification:
AttributeName: TimeToLive
Enabled: true
BillingMode: PAY_PER_REQUEST
这样在插入数据的时候只要在TimeToLive这个字段中保存指定的时间戳,DynamoDB就会在到达指定的时间后自动删除该记录。注意:具体删除时间不能保证,一般都会很快,但up to 48H。
使用DynamoDB Stream
在定义DynamoDB表时的声明
首先,在serverless.yml中定义DynamoDB时指定Stream:
resources:
Resources:
DemoTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: demo
AttributeDefinitions:
- AttributeName: PK
AttributeType: S
KeySchema:
- AttributeName: PK
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
其中的StreamViewType包括:
- KEYS_ONLY
- NEW_IMAGE
- OLD_IMAGE
- NEW_AND_OLD_IMAGES
设定Lambda对DB Stream的访问权限
provider:
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource: arn:aws:dynamodb:eu-west-1:375248921228:table/YOUR_TABLE_NAME/stream/YOUR_STREAM_NAME
注意:上面的Resource值为DynamoDB Stream的arn,而不是DynamoDB的arn。
首先将资源部署到AWS,然后就能获取对应的DynamoDB Stream的名字,比如:”2021-11-20T22:55:25.566”。
将Lambda和DynamoDB Stream相关联
streamFunction:
handler: streamFunction.handler
events:
- stream:
type: dynamodb
arn: 'arn:aws:dynamodb:REGION:ACCOUNT_ID:table/YOUR_TABLE/stream/YOUR_STREAM'
batchSize: 1
maximumRetryAttempts: 3
或者使用Fn::Join:
events:
- stream:
type: dynamodb
arn:
Fn::Join:
- ""
- - "arn:aws:dynamodb:"
- ${self:provider.region}
- ":"
- !Sub ${AWS::AccountId}
- ":table/"
- ${self:custom.dynamo.myTable}
- "/stream/"
- ${env:MY_TABLE_STREAM}
batchSize: 1
maximumRetryAttempts: 3
在.env中需要定义对应的STREAM:
MY_TABLE_STREAM=xxxxxxxxxxxxxxx
如果DynamoDB和DB Stream Lambda都定义在同一个serverless.yml中,也可以使用这样的格式直接引用数据库中表的名字:
- stream:
arn:
Fn::GetAtt:
- MyTable
- Arn
使用DB Stream向OpenSearch输出数据
这时就需要添加OpenSearch的依赖库:
DbStreamListener:
handler: src/functions/MyDbStreamListener.handler
package:
patterns:
- 'node_modules/@aws-sdk/credential-provider-ini/**'
- 'node_modules/@aws-sdk/credential-provider-node/**'
- 'node_modules/@aws-sdk/credential-provider-process/**'
- 'node_modules/@aws-sdk/is-array-buffer/**'
- 'node_modules/@aws-sdk/util-base64-node/**'
- 'node_modules/@aws-sdk/util-body-length-node/**'
- 'node_modules/@aws-sdk/util-buffer-from/**'
- 'node_modules/@aws-sdk/util-config-provider/**'
- 'node_modules/@aws-sdk/util-utf8-node/**'
- 'node_modules/@aws-sdk/util-uri-escape/**'
- 'node_modules/@opensearch-project/opensearch/**'
- 'node_modules/aws4/**'
- 'node_modules/aws-opensearch-connector/**'
- 'node_modules/debug/**'
- 'node_modules/hpagent/**'
- 'node_modules/ms/**'
- 'node_modules/secure-json-parse/**'
- 'node_modules/tslib/**'
- 'node_modules/uuid/**'
- 'node_modules/nanoid/**'
description: 'DB streams to OpenSearch'
memorySize: 512
reservedConcurrency: ${self:custom.opensearch.concurrency.sync}
events:
- stream:
type: dynamodb
arn:
Fn::Join:
- ""
- - "arn:aws:dynamodb:"
- ${self:provider.region}
- ":"
- !Sub ${AWS::AccountId}
- ":table/"
- ${self:custom.dynamo.MyTable}
- "/stream/"
- ${env:MY_TABLE_STREAM}
batchSize: 1
maximumRetryAttempts: 3
DBStream Lambda
注意事项:
- 一定要将处理过程放在try catch中,否则一旦出错,后续的步骤就不会被执行了。在生产环境中,应当把捕获的异常放入DLQ中,然后由专门的Lambda函数处理。而DBStream则应该继续处理下一条数据。
- 在DynamoDB中返回的数据都会在每个字段上添加类型,这就可能和数据本身格式不符,这时可以选择使用AWS.DynamoDB.Converter.unmarshall对数据进行转换。
import AWS from 'aws-sdk';
import OpenSearch from '../common/OpenSearch';
export const handler = async (event: any) => {
try {
for (let i = 0; i < event.Records.length; i += 1) {
const PK = event.Records[i].dynamodb.Keys.PK.S;
const jsonData = AWS.DynamoDB.Converter.unmarshall(event.Records[i].dynamodb.NewImage.Data.M);
const flatJson = await getFlatJson(jsonData);
await OpenSearch.addDoc(process.env.OPENSEARCH_INDEX as string, PK, flatJson);
}
} catch (error) {
console.log('error in DB Stream', error);
}
};
使用本地DynamoDB
针对上面的serverless.yml,稍作修改就可以部署到本地,更多关于进行本地开发的细节请参考这里 :
service: service-02
provider:
name: aws
region: eu-west-1
runtime: nodejs14.x
lambdaHashingVersion: 20201221
profile: default
resources:
Resources:
MyDynamoDbTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.tableName}
AttributeDefinitions:
- AttributeName: StudentID
AttributeType: S
- AttributeName: Subject
AttributeType: S
- AttributeName: Date
AttributeType: S
KeySchema:
- AttributeName: StudentID
KeyType: HASH
- AttributeName: Subject
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: Dates
KeySchema:
- AttributeName: Date
KeyType: HASH
- AttributeName: Subject
KeyType: RANGE
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
custom:
tableName: score-table
dynamodb:
stages:
- dev
start:
port: 8000
inMemory: true
migrate: true
migration:
dir: offline/migrations
plugins:
- serverless-dynamodb-local
- serverless-offline
在本地运行DynamoDB:
sls dynamodb start
需要用到这个文件:offline/migrations/score.json:
{
"Table": {
"TableName": "score-table",
"KeySchema": [
{
"AttributeName": "StudentID",
"KeyType": "HASH"
},
{
"AttributeName": "Subject",
"KeyType": "RANGE"
}
],
"AttributeDefinitions": [
{
"AttributeName": "StudentID",
"AttributeType": "S"
},
{
"AttributeName": "Subject",
"AttributeType": "S"
},
{
"AttributeName": "Date",
"AttributeType": "S"
}
],
"GlobalSecondaryIndexes": [
{
"AttributeName": "Date",
"KeyType": "HASH"
},
{
"AttributeName": "Subject",
"KeyType": "RANGE"
}
],
"ProvisionedThoughput": {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
}
}