AWS DynamoDB教程之三:Streams


作为AWS众多云服务的核心成员之一,DynamoDB得到了非常广泛的应用。下面就通过一系列教程来介绍一下DynamoDB的使用。本次主要介绍一下DynamoDB中Streams(流)的使用。Streams的主要作用就是可用来捕获 DynamoDB 表中的数据修改事件并触发相应操作,比如更新Opensearch中的索引。

AWS DynamoDB
AWS DynamoDB

AWS DynamoDB系列教程:

什么是DynamoDB Streams

DynamoDB中流的主要作用就是当DynamoDB中的记录被修改时,比如INSERT,UPDATE,DELETE时,触发相应的事件。在Stream中可以非常灵活的定义触发条件,比如在更改之前/之后触发相应操作。其特点包括:

  • 不影响源表的性能
  • 灵活的事件监测机制,包括Keys only,New Image,Old Image,New & Old Image
  • 支持批处理
  • 和Lambda的结合非常简单
  • 其实这些对表的操作都会被记录在一个log中(up to 24 hours)
  • 可以使用CreateTable / UpdateTalbe API来启用或更改一个stream

几个重要概念

  • StreamSpecification: 决定stream的设置如何
  • StreamEnabled: 决定是否启用/禁用一个stream
  • StreamViewType: 决定那些信息将被写入到stream中: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES

下面介绍一个应用场景:

实时面板

比如在一场大型体育赛事中,有一个表存储各项运动的最新成绩。同时有另一个数据源,比如OpenSearch存储各国总成绩排行榜。这时就可以使用Streams来检测运动成绩表,一旦数据发生变化,自动更新OpenSearch中的排行榜。客户端只需要不断从OpenSearch获取数据就好。

Streams处理流程

在DynamoDB中,储存那些变更的数据区叫做shard。

在DynamoDB中,待处理的数据类似这样:

json
{
  "Records": [
    {
      "eventID": "123434",
      "eventName": "INSERT",
      "eventVersion": "1.0",
      "eventSource": "aws:dynamodb",
      "awsRegion": "eu-west-1",
      "dynamodb": {
        "NewImage": {
          "teamId": {
            "S": "23sfae88x"
          },
          "date": {
            "S": "Nov 10 2021 13:00:10"
          },
          "sportId": {
            "S": "s889234"
          },
          "score": {
            "N": "20"
          }
        }
      }
    }
  ]
}

同时需要创建一个角色,具备以下权限:

json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

示例:创建一个Stream

在DynamoDB中创建表

创建表:SportEvents

Partition Key Sort Key
teamId date

AWS DynamoDB教程
AWS DynamoDB教程

创建IAM Role

创建一个新的Role,比如叫做DBStreamLambdaRole。该Role具有对DynamoDB及CloudWatch的访问权限。可以使用以下的policy:

  • AWSLambdaDynamoDBExecutionRole

AWS DynamoDB教程
AWS DynamoDB教程

创建Lambda

建议在Lambda函数中,所有代码都包含在try catch中,这样一旦处理过程中出现问题,可以发送消息给开发者及早解决问题。否则DynamoDB会一直尝试重新运行代码。

选择“Author from scratch”,选择刚创建的Role。

Javascript版本的Lambda代码:

Javascript
exports.handler = async (event, context, callback) => {
    try{
        console.log(JSON.stringify(event, null, 2));
        event.Records.forEach(function(record) {
            console.log("-------event start--------");
            console.log(record.eventID);
            console.log(record.eventName);
            console.log('DynamoDB Record: %j', record.dynamodb);
            console.log("-------event end--------");
        });
        callback(null, "message");
    }catch(err) {
        console.log("----Something went wrong!-----");
        console.log(err);
    } 
};

Python版本的Lambda:

Python
import json

def lambda_handler(event, context):
  try:
    for record in event['Records']:
      if record['eventName'] == 'INSERT':
        handle_insert(record)
      elif record['eventName'] == 'MODIFY':
        handle_modify(record)
      elif record['eventName'] == 'REMOVE':
        handle_remove(record)  
  except Exception as e:
    print("Something went wrong!")
    print(e)  

def handle_insert(record):
  print("-------insert--------")
  newImage = record['dynamodb']['NewImage']
  newTeamId = newImage['teamId']['S']
  print(newTeamId)
  print("-------end of insert--------")

def handle_modify(record):
  print("-------modify--------")
  oldImage = record['dynamodb']['OldImage']
  oldScore = oldImage['score']['N']

  newImage = record['dynamodb']['NewImage']
  newScore = newImage['score']['N']

  if oldScore != newScore:
    print(str(oldScore) + ' => ' + str(newScore))
  print("-------end of modify--------")  

def handle_remove(record):
  print("-------remove--------")    
  oldImage = record['dynamodb']['OldImage']
  oldTeamId = oldImage['teamId']['S']
  print('Removed: ' + oldTeamId)
  print("-------end of remove--------")

注意将前面创建的“DBStreamLambdaRole”赋予这个Lambda

添加DynamoDB到Lambda的触发器

这个触发器既可以在Lambda中定义,也可以在DynamoDB中定义。

选择前面创建的SportEvents表,然后选择Exports and streams。

AWS DynamoDB教程
AWS DynamoDB教程

选择”Enable stream”

选择”Create a trigger”然后选择前面刚创建的Lambda。

测试

首先在DynamoDB中插入一条数据:

json
{
  "teamId": {
    "S": "91001"
  },
  "date": {
    "S": "2021-11-01 15:20:00"
  },
  "sportId": {
    "S": "aeji02"  
  },
  "score": {
    "N": "25"  
  }
}

然后更新score。

之后删除该记录。

然后进入CloudWatch查看日志,就可以看到类似的日志:

text
-------insert--------
91001
-------end of insert--------
END RequestId: xxxx
REPORT RequestId: xxxx	Duration: 1.37 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 37 MB	Init Duration: 111.81 ms	
START RequestId: xxxx Version: $LATEST
-------modify--------
25 => 35
-------end of modify--------

文章作者: 逻思
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 逻思 !