serverless教程二:使用DynamoDB


在开发无服务器应用的时候,通过纯手工方式创建各种资源的话,效率会非常低,因此需要借助一些框架来提高开发效率。下面介绍如何在serverless中创建并使用AWS DynamoDB。

serverless教程

创建DynamoDB Table

假设我们要针对这个学生分数表来创建DynamoDB的表:

Student ID, Subject, Score, Date
--------------------------------
9901,       Math,     90,   2021-11-01
9901,       English,  80,   2021-11-02
9902,       Math,     92,   2021-11-01
9903,       Math,     63,   2021-11-01
9904,       Math,     90,   2021-11-01

那么在serverless.yml中定义DynamoDB对应的资源:

service: service-02

provider:
  name: aws
  region: eu-west-1
  runtime: nodejs14.x
  lambdaHashingVersion: 20201221
  profile: default

resources:  
  Resources:
    MyDynamoDbTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:custom.tableName}
        AttributeDefinitions:
          - AttributeName: StudentID
            AttributeType: S
          - AttributeName: Subject
            AttributeType: S  
          - AttributeName: Date
            AttributeType: S 
        KeySchema: 
          - AttributeName: StudentID
            KeyType: HASH
          - AttributeName: Subject
            KeyType: RANGE  
        GlobalSecondaryIndexes:
          - IndexName: Dates
            KeySchema:
              - AttributeName: Date
                KeyType: HASH
              - AttributeName: Subject
                KeyType: RANGE
            Projection:
              ProjectionType: ALL    
        BillingMode: PAY_PER_REQUEST

custom:
  tableName: score-table

需要注意的是,在KeySchema中,HASH对应的是分区键(Partition Key),RANGE对应的是索引键(SORT KEY/RANGE KEY)。

另外关于GSI定义中的ProjectType,指的是将所有属性从Table复制到Index。

运行如下命令就可以部署到AWS:

sls deploy

使用TTL(Time To Live)

可以使用TTL来指定特定数据的生命期。比如:

TimeToLiveSpecification:
  AttributeName: TimeToLive
  Enabled: true    
BillingMode: PAY_PER_REQUEST

这样在插入数据的时候只要在TimeToLive这个字段中保存指定的时间戳,DynamoDB就会在到达指定的时间后自动删除该记录。注意:具体删除时间不能保证,一般都会很快,但up to 48H。

使用DynamoDB Stream

在定义DynamoDB表时的声明

首先,在serverless.yml中定义DynamoDB时指定Stream:

resources:
  Resources:
    DemoTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: demo
        AttributeDefinitions:
          - AttributeName: PK
            AttributeType: S
        KeySchema:
          - AttributeName: PK
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 5
          WriteCapacityUnits: 5
        StreamSpecification:
          StreamViewType: NEW_AND_OLD_IMAGES

其中的StreamViewType包括:

  • KEYS_ONLY
  • NEW_IMAGE
  • OLD_IMAGE
  • NEW_AND_OLD_IMAGES

设定Lambda对DB Stream的访问权限

provider:
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:DescribeStream
        - dynamodb:GetRecords
        - dynamodb:GetShardIterator
        - dynamodb:ListStreams
      Resource: arn:aws:dynamodb:eu-west-1:375248921228:table/YOUR_TABLE_NAME/stream/YOUR_STREAM_NAME

注意:上面的Resource值为DynamoDB Stream的arn,而不是DynamoDB的arn

首先将资源部署到AWS,然后就能获取对应的DynamoDB Stream的名字,比如:”2021-11-20T22:55:25.566”。

将Lambda和DynamoDB Stream相关联

streamFunction:
    handler: streamFunction.handler
    events:
      - stream: 
          type: dynamodb
          arn: 'arn:aws:dynamodb:REGION:ACCOUNT_ID:table/YOUR_TABLE/stream/YOUR_STREAM'
          batchSize: 1  
          maximumRetryAttempts: 3

或者使用Fn::Join:

events:
  - stream: 
      type: dynamodb
      arn: 
        Fn::Join:
          - ""
          - - "arn:aws:dynamodb:"
            - ${self:provider.region}
            - ":"
            - !Sub ${AWS::AccountId}
            - ":table/"
            - ${self:custom.dynamo.myTable}
            - "/stream/"
            - ${env:MY_TABLE_STREAM}
      batchSize: 1
      maximumRetryAttempts: 3

在.env中需要定义对应的STREAM:

MY_TABLE_STREAM=xxxxxxxxxxxxxxx

如果DynamoDB和DB Stream Lambda都定义在同一个serverless.yml中,也可以使用这样的格式直接引用数据库中表的名字:

- stream:
    arn:
      Fn::GetAtt:
        - MyTable
        - Arn

使用DB Stream向OpenSearch输出数据

这时就需要添加OpenSearch的依赖库:

DbStreamListener:
  handler: src/functions/MyDbStreamListener.handler
  package:
    patterns:
      - 'node_modules/@aws-sdk/credential-provider-ini/**'
      - 'node_modules/@aws-sdk/credential-provider-node/**'
      - 'node_modules/@aws-sdk/credential-provider-process/**'
      - 'node_modules/@aws-sdk/is-array-buffer/**'
      - 'node_modules/@aws-sdk/util-base64-node/**'
      - 'node_modules/@aws-sdk/util-body-length-node/**'
      - 'node_modules/@aws-sdk/util-buffer-from/**'
      - 'node_modules/@aws-sdk/util-config-provider/**'
      - 'node_modules/@aws-sdk/util-utf8-node/**'
      - 'node_modules/@aws-sdk/util-uri-escape/**'
      - 'node_modules/@opensearch-project/opensearch/**'
      - 'node_modules/aws4/**'
      - 'node_modules/aws-opensearch-connector/**'
      - 'node_modules/debug/**'
      - 'node_modules/hpagent/**'
      - 'node_modules/ms/**'
      - 'node_modules/secure-json-parse/**'
      - 'node_modules/tslib/**'
      - 'node_modules/uuid/**'
      - 'node_modules/nanoid/**'
  description: 'DB streams to OpenSearch'
  memorySize: 512
  reservedConcurrency: ${self:custom.opensearch.concurrency.sync}
  events:
    - stream: 
        type: dynamodb
        arn: 
          Fn::Join:
            - ""
            - - "arn:aws:dynamodb:"
              - ${self:provider.region}
              - ":"
              - !Sub ${AWS::AccountId}
              - ":table/"
              - ${self:custom.dynamo.MyTable}
              - "/stream/"
              - ${env:MY_TABLE_STREAM}
        batchSize: 1
        maximumRetryAttempts: 3

DBStream Lambda

注意事项:

  • 一定要将处理过程放在try catch中,否则一旦出错,后续的步骤就不会被执行了。在生产环境中,应当把捕获的异常放入DLQ中,然后由专门的Lambda函数处理。而DBStream则应该继续处理下一条数据。
  • 在DynamoDB中返回的数据都会在每个字段上添加类型,这就可能和数据本身格式不符,这时可以选择使用AWS.DynamoDB.Converter.unmarshall对数据进行转换
import AWS from 'aws-sdk';
import OpenSearch from '../common/OpenSearch';

export const handler = async (event: any) => {
  try {
    for (let i = 0; i < event.Records.length; i += 1) {
      const PK = event.Records[i].dynamodb.Keys.PK.S;
      const jsonData = AWS.DynamoDB.Converter.unmarshall(event.Records[i].dynamodb.NewImage.Data.M);
      const flatJson = await getFlatJson(jsonData);
      await OpenSearch.addDoc(process.env.OPENSEARCH_INDEX as string, PK, flatJson);
    }
  } catch (error) {
    console.log('error in DB Stream', error);
  }
};

使用本地DynamoDB

针对上面的serverless.yml,稍作修改就可以部署到本地,更多关于进行本地开发的细节请参考这里

service: service-02

provider:
  name: aws
  region: eu-west-1
  runtime: nodejs14.x
  lambdaHashingVersion: 20201221
  profile: default

resources:  
  Resources:
    MyDynamoDbTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:custom.tableName}
        AttributeDefinitions:
          - AttributeName: StudentID
            AttributeType: S
          - AttributeName: Subject
            AttributeType: S  
          - AttributeName: Date
            AttributeType: S 
        KeySchema: 
          - AttributeName: StudentID
            KeyType: HASH
          - AttributeName: Subject
            KeyType: RANGE  
        GlobalSecondaryIndexes:
          - IndexName: Dates
            KeySchema:
              - AttributeName: Date
                KeyType: HASH
              - AttributeName: Subject
                KeyType: RANGE
            Projection:
              ProjectionType: ALL    
        BillingMode: PAY_PER_REQUEST

custom:
  tableName: score-table
  dynamodb:
    stages:
      - dev
    start:
      port: 8000
      inMemory: true
      migrate: true
    migration:
      dir: offline/migrations


plugins:
  - serverless-dynamodb-local
  - serverless-offline

在本地运行DynamoDB:

sls dynamodb start

需要用到这个文件:offline/migrations/score.json:

{
    "Table": {
        "TableName": "score-table",
        "KeySchema": [
            {
                "AttributeName": "StudentID",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "Subject",
                "KeyType": "RANGE"
            }
        ],
        "AttributeDefinitions": [
            {
                "AttributeName": "StudentID",
                "AttributeType": "S"
            },
            {
                "AttributeName": "Subject",
                "AttributeType": "S"
            },
            {
                "AttributeName": "Date",
                "AttributeType": "S"
            }
        ],
        "GlobalSecondaryIndexes": [
            {
                "AttributeName": "Date",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "Subject",
                "KeyType": "RANGE"
            }
        ],
        "ProvisionedThoughput": {
            "ReadCapacityUnits": 1,
            "WriteCapacityUnits": 1
        }
    }
}

文章作者: 逻思
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 逻思 !
  目录