作为AWS众多云服务的核心成员之一,DynamoDB得到了非常广泛的应用。下面就通过一系列教程来介绍一下DynamoDB的使用。本次主要介绍一下DynamoDB中Streams(流)的使用。Streams的主要作用就是可用来捕获 DynamoDB 表中的数据修改事件并触发相应操作,比如更新Opensearch中的索引。

AWS DynamoDB系列教程:
- AWS DynamoDB系列之一:简介
- AWS DynamoDB教程之二:主键的设计及GSI
- AWS DynamoDB系列之三:Streams
- AWS DynamoDB系列之四:在Node.js中访问DynamoDB
- AWS DynamoDB系列之五:在本地安装DynamoDB
- AWS DynamoDB教程之六:如何使用APIGateway Service Proxy访问DynamoDB数据
- AWS DynamoDB教程之七:DynamoDB的访问控制
- AWS DynamoDB教程之八:数据备份/恢复及导出
- AWS DynamoDB教程之九:性能监测和调优,Audit Table及TTL
什么是DynamoDB Streams
DynamoDB中流的主要作用就是当DynamoDB中的记录被修改时,比如INSERT,UPDATE,DELETE时,触发相应的事件。在Stream中可以非常灵活的定义触发条件,比如在更改之前/之后触发相应操作。其特点包括:
- 不影响源表的性能
- 灵活的事件监测机制,包括Keys only,New Image,Old Image,New & Old Image
- 支持批处理
- 和Lambda的结合非常简单
- 其实这些对表的操作都会被记录在一个log中(up to 24 hours)
- 可以使用CreateTable / UpdateTalbe API来启用或更改一个stream
几个重要概念
- StreamSpecification: 决定stream的设置如何
- StreamEnabled: 决定是否启用/禁用一个stream
- StreamViewType: 决定那些信息将被写入到stream中: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
下面介绍一个应用场景:
实时面板
比如在一场大型体育赛事中,有一个表存储各项运动的最新成绩。同时有另一个数据源,比如OpenSearch存储各国总成绩排行榜。这时就可以使用Streams来检测运动成绩表,一旦数据发生变化,自动更新OpenSearch中的排行榜。客户端只需要不断从OpenSearch获取数据就好。
Streams处理流程
在DynamoDB中,储存那些变更的数据区叫做shard。
在DynamoDB中,待处理的数据类似这样:
{
"Records": [
{
"eventID": "123434",
"eventName": "INSERT",
"eventVersion": "1.0",
"eventSource": "aws:dynamodb",
"awsRegion": "eu-west-1",
"dynamodb": {
"NewImage": {
"teamId": {
"S": "23sfae88x"
},
"date": {
"S": "Nov 10 2021 13:00:10"
},
"sportId": {
"S": "s889234"
},
"score": {
"N": "20"
}
}
}
}
]
}
同时需要创建一个角色,具备以下权限:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
示例:创建一个Stream
在DynamoDB中创建表
创建表:SportEvents
Partition Key | Sort Key |
---|---|
teamId | date |

创建IAM Role
创建一个新的Role,比如叫做DBStreamLambdaRole。该Role具有对DynamoDB及CloudWatch的访问权限。可以使用以下的policy:
- AWSLambdaDynamoDBExecutionRole

创建Lambda
建议在Lambda函数中,所有代码都包含在try catch中,这样一旦处理过程中出现问题,可以发送消息给开发者及早解决问题。否则DynamoDB会一直尝试重新运行代码。
选择“Author from scratch”,选择刚创建的Role。
Javascript版本的Lambda代码:
exports.handler = async (event, context, callback) => {
try{
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
console.log("-------event start--------");
console.log(record.eventID);
console.log(record.eventName);
console.log('DynamoDB Record: %j', record.dynamodb);
console.log("-------event end--------");
});
callback(null, "message");
}catch(err) {
console.log("----Something went wrong!-----");
console.log(err);
}
};
Python版本的Lambda:
import json
def lambda_handler(event, context):
try:
for record in event['Records']:
if record['eventName'] == 'INSERT':
handle_insert(record)
elif record['eventName'] == 'MODIFY':
handle_modify(record)
elif record['eventName'] == 'REMOVE':
handle_remove(record)
except Exception as e:
print("Something went wrong!")
print(e)
def handle_insert(record):
print("-------insert--------")
newImage = record['dynamodb']['NewImage']
newTeamId = newImage['teamId']['S']
print(newTeamId)
print("-------end of insert--------")
def handle_modify(record):
print("-------modify--------")
oldImage = record['dynamodb']['OldImage']
oldScore = oldImage['score']['N']
newImage = record['dynamodb']['NewImage']
newScore = newImage['score']['N']
if oldScore != newScore:
print(str(oldScore) + ' => ' + str(newScore))
print("-------end of modify--------")
def handle_remove(record):
print("-------remove--------")
oldImage = record['dynamodb']['OldImage']
oldTeamId = oldImage['teamId']['S']
print('Removed: ' + oldTeamId)
print("-------end of remove--------")
注意将前面创建的“DBStreamLambdaRole”赋予这个Lambda。
添加DynamoDB到Lambda的触发器
这个触发器既可以在Lambda中定义,也可以在DynamoDB中定义。
选择前面创建的SportEvents表,然后选择Exports and streams。

选择”Enable stream”
选择”Create a trigger”然后选择前面刚创建的Lambda。
测试
首先在DynamoDB中插入一条数据:
{
"teamId": {
"S": "91001"
},
"date": {
"S": "2021-11-01 15:20:00"
},
"sportId": {
"S": "aeji02"
},
"score": {
"N": "25"
}
}
然后更新score。
之后删除该记录。
然后进入CloudWatch查看日志,就可以看到类似的日志:
-------insert--------
91001
-------end of insert--------
END RequestId: xxxx
REPORT RequestId: xxxx Duration: 1.37 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 37 MB Init Duration: 111.81 ms
START RequestId: xxxx Version: $LATEST
-------modify--------
25 => 35
-------end of modify--------