AWS DynamoDB教程之四:在Node.js中访问DynamoDB


作为AWS众多云服务的核心成员之一,DynamoDB得到了非常广泛的应用。下面就通过一系列教程来介绍一下如何通过AWS SDK来操作DynamoDB。本次主要介绍如何在Node.js中访问DynamoDB中的资源,比如如何创建表,然后进行增删改查等操作。

AWS DynamoDB
AWS DynamoDB

AWS DynamoDB系列教程:

创建目录并安装依赖包

bash
mkdir demo
cd demo
npm install aws-sdk

AWS SDK提供的两种访问方式

在使用AWS SDK访问DynamoDB的时候,可以选择Service interface或者Document client:

  • Service interface: 这种方式会提供更多的选项
  • Document client:这种方式更加易用
Javascript
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"

// Service interface
let dynamodb = new AWS.DynamoDB({apiVersion: '2012-08-10'});

// Document Client
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();

使用不同的AWS profile

JavaScript
if (!options.AWS_Profile) throw Error('AWS user profile must be provided')

const credentials = new AWS.SharedIniFileCredentials({ profile: options.AWS_Profile });
AWS.config.update({ region: options.region ? options.region : 'eu-west-1' });
AWS.config.credentials = credentials;

const DynamoDB = new AWS.DynamoDB.DocumentClient();

创建表(CREATE)

在创建DynamoDB的时候,只需指定主键相关的字段即可,如果Partition Key是主键,只需要声明该字段。如果是由Partition Key和Sort Key组合形成主键,则都需指明。

  • 指明Partition Key需要使用”HASH”
  • 指明Sort Key则需使用“RANGE”
Javascript
// npm install aws-sdk
let AWS = require("aws-sdk");
AWS.config.update({region: "eu-west-1"});

let dynamodb = new AWS.DynamoDB({apiVersion: "2012-08-10"});

let params = {
  AttributeDefinitions: [
    {
      AttributeName: "studentId",
      AttributeType: "S"
    },
    {
      AttributeName: "subject",
      AttributeType: "S"
    }
  ],
  KeySchema: [
    {
      AttributeName: "studentId",
      KeyType: "HASH"
    },
    {
      AttributeName: "subject",
      KeyType: "RANGE"
    }
  ],
  BillingMode: "PAY_PER_REQUEST",
  TableName: "ScoreTable"
};

dynamodb.createTable(params, function(err, data) {
  if (err) {
    console.log("Error", err);
  } else {
    console.log("Table Created", data);
  }
});

创建LSI/GSI时使用的参数:

JavaScript
const params = {
  TableName: 'temp',
  KeySchema: [
    { AttributeName: 'category', KeyType: 'HASH' },
    { AttributeName: 'id', KeyType: 'RANGE' },
  ],
  AttributeDefinitions: [
    { AttributeName: 'category', AttributeType: 'S' },
    { AttributeName: 'id', AttributeType: 'S' },
    { AttributeName: 'year', AttributeType: 'S' },
  ],
  LocalSecondaryIndexes: [
    {
      IndexName: 'yearIndex',
      KeySchema: [
        { AttributeName: 'category', KeyType: 'HASH' },
        { AttributeName: 'year', KeyType: 'RANGE' },
      ],
      Projection: {
        ProjectionType: 'KEYS_ONLY',
      },
    },
  ],
  BillingMode: 'PAY_PER_REQUEST',
};

插入数据(INSERT)

用到的数据文件scores.json:

json
{
    "scores":
    [
        {
            "studentId": "9901",
            "subject": "Math",
            "score": "90",
            "date": "2021-11-01 13:05:00"
        },
        {
            "studentId": "9901",
            "subject": "English",
            "score": "80",
            "date": "2021-10-25 13:05:00"
        },
        {
            "studentId": "9902",
            "subject": "Math",
            "score": "63",
            "date": "2021-11-01 13:05:00"
        },
        {
            "studentId": "9903",
            "subject": "English",
            "score": "75",
            "date": "2021-10-01 13:05:00"
        }
    ]
}

插入数据的JS代码:

Javascript
let AWS = require("aws-sdk");
let fs = require("fs");
AWS.config.update({region: "eu-west-1"});

let documentClient = new AWS.DynamoDB.DocumentClient();
let scores = JSON.parse(fs.readFileSync('scores.json', 'utf8'));

scores["scores"].forEach(function(score) {
  var params = {
    TableName: "ScoreTable",
    Item: {
      "studentId":  score.studentId,
      "subject": score.subject,
      "score":  score.score,
      "date": score.date
    }
  };

  documentClient.put(params, function(err, data) {
    if (err) {
      console.error("Error while inserting data.");
    } else {
      console.log("Succeeded adding a record ");
    }
  });
});

更新数据(UPDATE)

JavaScript
async update(tableName: string, pk: string, sk: string, data: string) {
  const params = {
    TableName: tableName,
    Key: {
      PK: pk,
      SK: sk,
    },
    UpdateExpression: 'set #data = :data',
    ExpressionAttributeNames: {
      '#data': 'Data',
    },
    ExpressionAttributeValues: {
      ':data': data,
    },
    ReturnValues: 'UPDATED_NEW',
  };

  return documentClient.update(params).promise();
},

删除数据

JavaScript
async delete(tableName: string, pk: string, sk: string) {
  const params = {
    TableName: tableName,
    Key: {
      PK: pk,
      SK: sk,
    },
  };

  return documentClient.delete(params).promise();
}

获取单条数据(GetItem)

Javascript
async function getSingleScore(){
    try {
        var params = {
            Key: {
             "studentId": "9901", 
             "subject": "Math"
            }, 
            TableName: tableName
        };
        var result = await DocumentClient.get(params).promise()
        console.log(JSON.stringify(result))
    } catch (error) {
        console.error(error);
    }
}

getSingleScore()

获取多条数据

Javascript
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();

async function scoresByAuthor(){
    try {
        var params = {
            KeyConditionExpression: 'studentId = :studentId',
            ExpressionAttributeValues: {
                ':studentId': '9901'
            },
            TableName: tableName
        };
        var result = await ddbDocumentClient.query(params).promise()
        console.log(JSON.stringify(result))
    } catch (error) {
        console.error(error);
    }
}
scoresByAuthor()

其返回值为:

json
{
   "Items":[
      {
         "subject":"English",
         "score":"80",
         "date":"2021-10-25 13:05:00",
         "studentId":"9901"
      },
      {
         "subject":"Math",
         "score":"90",
         "date":"2021-11-01 13:05:00",
         "studentId":"9901"
      }
   ],
   "Count":2,
   "ScannedCount":2
}

同时根据PK和SK查询

有些时候需要同时根据Partition Key和Sort Key来进行查询。

Javascript
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();

async function queryPkSK(){
    try {
        var params = {
            KeyConditionExpression: 'studentId = :studentId AND subject = :subject',
            ExpressionAttributeValues: {
                ':studentId': '9901',
                ':subject': 'Math'
            },
            TableName: tableName
        };
        var result = await ddbDocumentClient.query(params).promise()
        console.log(JSON.stringify(result))
    } catch (error) {
        console.error(error);
    }
}
queryPkSK()

查询GSI/LSI

TypeScript
const params = {
  TableName: 'YOUR_TABLE_NAME',
  IndexName: 'GSI1',
  KeyConditionExpression: 'GSIPK = :pk and GSISK = :sk',
  ExpressionAttributeValues: { ':pk': 'sport', ':sk': '2022' },
};
console.log(params);
const result = await Dynamo.query(params);

过滤FilterExpression

FilterExpression的作用是过滤查询/Scan结果,因此这个操作是在Query/Scan发生的。需要注意的是,FilterExpression只是对查询结果的一个辅助处理,不应被过度使用。否则,可能数据库的设计有问题。

如果需要频繁使用FilterExpression,则应考虑使用GSI

示例代码:

Javascript
const AWS = require('aws-sdk');
const client = new AWS.DynamoDB.DocumentClient({apiVersion: '2012-08-10'});

const results = await client.scan({
  TableName: 'SalesTable',
  FilterExpression: 'Sales >= :platinum' AND begins_with(ProductName, :song),
  ExpressionAttributeValues: { 
    ':platinum': 1000000,
    ':song': 'SONG#'
  }
})

Scan一个表

可以通过Scan的方式返回表中所有数据,同时也可以加filter过滤。按scan的效率过低,一般应避免使用。

Javascript
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();

async function scanTable(){
    try {
        var params = {
            TableName: tableName
        };
        var result = await ddbDocumentClient.scan(params).promise()
        console.log(JSON.stringify(result))
    } catch (error) {
        console.error(error);
    }
}
scanTable()

分页Pagination

默认情况下,DynamoDB返回数据尺寸是有限制的,1MB。因此这就有可能和pagination冲突,比如按照设定,应返回10条记录,但只需要8条记录就已经1MB了,这个时候后面的记录就无法返回了。为了解决这个问题,在DynamoDB中,有几个概念:

  • QueryPageSize:每页记录数
  • LastEvaluatedKey:这就是最近一次返回数据的指针。通过这个指针,就能准确定义到下一条记录。如果没有后续数据了,则返回null。
  • ExclusiveStaretKey: 这就是本次返回数据的起始点。通过这种方式就能够准确定位本页中记录的起始位置。

几个注意事项:

  • 无法使用多线程同时获取不同页面,或者同一页面的不同数据。这主要是受限于LastEvaluatedKey。但在进行Scan的时候是可以这样做的。
  • 在API中使用pagination时,不要试图返回所有数据,最好也返回不同页面的数据。

简单的pagination示例:

Javascript
let AWS = require("aws-sdk");
AWS.config.update({region: 'eu-west-1'});
const tableName = "ScoreTable"
let ddbDocumentClient = new AWS.DynamoDB.DocumentClient();

async function getAllScores(){
    const getData = async (params, startKey) => {
        if (startKey) {
          params.ExclusiveStartKey = startKey
        }
        return ddbDocumentClient.query(params).promise()
    }

    let params = {
        KeyConditionExpression: 'studentId = :studentId',
        ExpressionAttributeValues: {
            ':studentId': '9901'
        },
        TableName: tableName
    };

    let lastEvaluatedKey = null
    let result_list = []
    do {
        const result = await getData(params, lastEvaluatedKey)
        result_list = result_list.concat(result.Items)
        lastEvaluatedKey = result.LastEvaluatedKey
    } while (lastEvaluatedKey)

    return result_list
}

getAllScores().then(result => {
    console.log(result);
})

文章作者: 逻思
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 逻思 !