作为AWS众多云服务的核心成员之一,DynamoDB得到了非常广泛的应用。下面就通过一系列教程来介绍一下如何通过AWS SDK来操作DynamoDB。本次主要介绍如何在Node.js中访问DynamoDB中的资源,比如如何创建表,然后进行增删改查等操作。

AWS DynamoDB系列教程:
- AWS DynamoDB系列之一:简介
- AWS DynamoDB教程之二:主键的设计及GSI
- AWS DynamoDB系列之三:Streams
- AWS DynamoDB系列之四:在Node.js中访问DynamoDB
- AWS DynamoDB系列之五:在本地安装DynamoDB
- AWS DynamoDB教程之六:如何使用APIGateway Service Proxy访问DynamoDB数据
- AWS DynamoDB教程之七:DynamoDB的访问控制
- AWS DynamoDB教程之八:数据备份/恢复及导出
- AWS DynamoDB教程之九:性能监测和调优,Audit Table及TTL
创建目录并安装依赖包
mkdir demo
cd demo
npm install aws-sdk
AWS SDK提供的两种访问方式
在使用AWS SDK访问DynamoDB的时候,可以选择Service interface或者Document client:
- Service interface: 这种方式会提供更多的选项
- Document client:这种方式更加易用
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
// Service interface
let dynamodb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
// Document Client
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();
使用不同的AWS profile
if (!options.AWS_Profile) throw Error('AWS user profile must be provided')
const credentials = new AWS.SharedIniFileCredentials({ profile: options.AWS_Profile });
AWS.config.update({ region: options.region ? options.region : 'eu-west-1' });
AWS.config.credentials = credentials;
const DynamoDB = new AWS.DynamoDB.DocumentClient();
创建表(CREATE)
在创建DynamoDB的时候,只需指定主键相关的字段即可,如果Partition Key是主键,只需要声明该字段。如果是由Partition Key和Sort Key组合形成主键,则都需指明。
- 指明Partition Key需要使用”HASH”
- 指明Sort Key则需使用“RANGE”
// npm install aws-sdk
let AWS = require("aws-sdk");
AWS.config.update({region: "eu-west-1"});
let dynamodb = new AWS.DynamoDB({apiVersion: "2012-08-10"});
let params = {
AttributeDefinitions: [
{
AttributeName: "studentId",
AttributeType: "S"
},
{
AttributeName: "subject",
AttributeType: "S"
}
],
KeySchema: [
{
AttributeName: "studentId",
KeyType: "HASH"
},
{
AttributeName: "subject",
KeyType: "RANGE"
}
],
BillingMode: "PAY_PER_REQUEST",
TableName: "ScoreTable"
};
dynamodb.createTable(params, function(err, data) {
if (err) {
console.log("Error", err);
} else {
console.log("Table Created", data);
}
});
创建LSI/GSI时使用的参数:
const params = {
TableName: 'temp',
KeySchema: [
{ AttributeName: 'category', KeyType: 'HASH' },
{ AttributeName: 'id', KeyType: 'RANGE' },
],
AttributeDefinitions: [
{ AttributeName: 'category', AttributeType: 'S' },
{ AttributeName: 'id', AttributeType: 'S' },
{ AttributeName: 'year', AttributeType: 'S' },
],
LocalSecondaryIndexes: [
{
IndexName: 'yearIndex',
KeySchema: [
{ AttributeName: 'category', KeyType: 'HASH' },
{ AttributeName: 'year', KeyType: 'RANGE' },
],
Projection: {
ProjectionType: 'KEYS_ONLY',
},
},
],
BillingMode: 'PAY_PER_REQUEST',
};
插入数据(INSERT)
用到的数据文件scores.json:
{
"scores":
[
{
"studentId": "9901",
"subject": "Math",
"score": "90",
"date": "2021-11-01 13:05:00"
},
{
"studentId": "9901",
"subject": "English",
"score": "80",
"date": "2021-10-25 13:05:00"
},
{
"studentId": "9902",
"subject": "Math",
"score": "63",
"date": "2021-11-01 13:05:00"
},
{
"studentId": "9903",
"subject": "English",
"score": "75",
"date": "2021-10-01 13:05:00"
}
]
}
插入数据的JS代码:
let AWS = require("aws-sdk");
let fs = require("fs");
AWS.config.update({region: "eu-west-1"});
let documentClient = new AWS.DynamoDB.DocumentClient();
let scores = JSON.parse(fs.readFileSync('scores.json', 'utf8'));
scores["scores"].forEach(function(score) {
var params = {
TableName: "ScoreTable",
Item: {
"studentId": score.studentId,
"subject": score.subject,
"score": score.score,
"date": score.date
}
};
documentClient.put(params, function(err, data) {
if (err) {
console.error("Error while inserting data.");
} else {
console.log("Succeeded adding a record ");
}
});
});
更新数据(UPDATE)
async update(tableName: string, pk: string, sk: string, data: string) {
const params = {
TableName: tableName,
Key: {
PK: pk,
SK: sk,
},
UpdateExpression: 'set #data = :data',
ExpressionAttributeNames: {
'#data': 'Data',
},
ExpressionAttributeValues: {
':data': data,
},
ReturnValues: 'UPDATED_NEW',
};
return documentClient.update(params).promise();
},
删除数据
async delete(tableName: string, pk: string, sk: string) {
const params = {
TableName: tableName,
Key: {
PK: pk,
SK: sk,
},
};
return documentClient.delete(params).promise();
}
获取单条数据(GetItem)
async function getSingleScore(){
try {
var params = {
Key: {
"studentId": "9901",
"subject": "Math"
},
TableName: tableName
};
var result = await DocumentClient.get(params).promise()
console.log(JSON.stringify(result))
} catch (error) {
console.error(error);
}
}
getSingleScore()
获取多条数据
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();
async function scoresByAuthor(){
try {
var params = {
KeyConditionExpression: 'studentId = :studentId',
ExpressionAttributeValues: {
':studentId': '9901'
},
TableName: tableName
};
var result = await ddbDocumentClient.query(params).promise()
console.log(JSON.stringify(result))
} catch (error) {
console.error(error);
}
}
scoresByAuthor()
其返回值为:
{
"Items":[
{
"subject":"English",
"score":"80",
"date":"2021-10-25 13:05:00",
"studentId":"9901"
},
{
"subject":"Math",
"score":"90",
"date":"2021-11-01 13:05:00",
"studentId":"9901"
}
],
"Count":2,
"ScannedCount":2
}
同时根据PK和SK查询
有些时候需要同时根据Partition Key和Sort Key来进行查询。
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();
async function queryPkSK(){
try {
var params = {
KeyConditionExpression: 'studentId = :studentId AND subject = :subject',
ExpressionAttributeValues: {
':studentId': '9901',
':subject': 'Math'
},
TableName: tableName
};
var result = await ddbDocumentClient.query(params).promise()
console.log(JSON.stringify(result))
} catch (error) {
console.error(error);
}
}
queryPkSK()
查询GSI/LSI
const params = {
TableName: 'YOUR_TABLE_NAME',
IndexName: 'GSI1',
KeyConditionExpression: 'GSIPK = :pk and GSISK = :sk',
ExpressionAttributeValues: { ':pk': 'sport', ':sk': '2022' },
};
console.log(params);
const result = await Dynamo.query(params);
过滤FilterExpression
FilterExpression的作用是过滤查询/Scan结果,因此这个操作是在Query/Scan发生的。需要注意的是,FilterExpression只是对查询结果的一个辅助处理,不应被过度使用。否则,可能数据库的设计有问题。
如果需要频繁使用FilterExpression,则应考虑使用GSI。
示例代码:
const AWS = require('aws-sdk');
const client = new AWS.DynamoDB.DocumentClient({apiVersion: '2012-08-10'});
const results = await client.scan({
TableName: 'SalesTable',
FilterExpression: 'Sales >= :platinum' AND begins_with(ProductName, :song),
ExpressionAttributeValues: {
':platinum': 1000000,
':song': 'SONG#'
}
})
Scan一个表
可以通过Scan的方式返回表中所有数据,同时也可以加filter过滤。按scan的效率过低,一般应避免使用。
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();
async function scanTable(){
try {
var params = {
TableName: tableName
};
var result = await ddbDocumentClient.scan(params).promise()
console.log(JSON.stringify(result))
} catch (error) {
console.error(error);
}
}
scanTable()
分页Pagination
默认情况下,DynamoDB返回数据尺寸是有限制的,1MB。因此这就有可能和pagination冲突,比如按照设定,应返回10条记录,但只需要8条记录就已经1MB了,这个时候后面的记录就无法返回了。为了解决这个问题,在DynamoDB中,有几个概念:
- QueryPageSize:每页记录数
- LastEvaluatedKey:这就是最近一次返回数据的指针。通过这个指针,就能准确定义到下一条记录。如果没有后续数据了,则返回null。
- ExclusiveStaretKey: 这就是本次返回数据的起始点。通过这种方式就能够准确定位本页中记录的起始位置。
几个注意事项:
- 无法使用多线程同时获取不同页面,或者同一页面的不同数据。这主要是受限于LastEvaluatedKey。但在进行Scan的时候是可以这样做的。
- 在API中使用pagination时,不要试图返回所有数据,最好也返回不同页面的数据。
简单的pagination示例:
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();
async function getAllScores(){
const getData = async (params, startKey) => {
if (startKey) {
params.ExclusiveStartKey = startKey
}
return ddbDocumentClient.query(params).promise()
}
let params = {
KeyConditionExpression: 'studentId = :studentId',
ExpressionAttributeValues: {
':studentId': '9901'
},
TableName: tableName
};
let lastEvaluatedKey = null
let result_list = []
do {
const result = await getData(params, lastEvaluatedKey)
result_list = result_list.concat(result.Items)
lastEvaluatedKey = result.LastEvaluatedKey
} while (lastEvaluatedKey)
return result_list
}
getAllScores().then(result => {
console.log(result);
})