All files / node-collections-boilerplate-nahid/storage DynamoStorage.js

93.02% Statements 40/43
66.67% Branches 12/18
100% Functions 11/11
93.02% Lines 40/43
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158    1x 1x   1x 1x 1x                               38x   38x   38x                     38x 38x     19x   19x           38x   19x   38x           63x 63x   63x       63x     63x         63x   63x           63x 63x           63x   1018x   63x         63x             410x           4x       4x     4x     2x         2x           412x         412x           412x       412x     412x         1x  
"use strict";
 
const AWS = require('aws-sdk');
const Storage = require('./Storage');
 
const dynamoDecodeRecord = require('./dynamoDecodeRecord');
const dynamoEncodeRecord = require('./dynamoEncodeRecord');
const StreamsNotifier = require('./DynamoStorageStreamNotifier');
 
/**
 * Use a AWS DynamoDB table as storage.
 *
 * Pretty pointless storage system but have some legacy data in it.
 *
 * Needs ```aws-sdk``` package.
 */
class DynamoStorage extends Storage
{
  /**
   * @param {StorageOptions} options see fields
   */
  constructor(options = {})
  {
    super(options)
    /** AWS region */
    this.region = options.region || undefined;
    /** reference to driver object */
    this.db = new AWS.DynamoDB({
      apiVersion: '2012-08-10',
      maxRetries: 99 * 1024,
      endpoint: this.connectionString ? new AWS.Endpoint(this.connectionString) : undefined,
      region: this.region
    });
 
    /**
     * Set it to true to use streams for receiving data updates.
     * @type {boolean}
     */
    this.useStreams = options.useStreams || false;
    if (this.useStreams)
    {
      /** @private */
      this.notifier = new StreamsNotifier(this);
      /** @private */
      this.updateCheckImpl = this.notifier.updateCheck.bind(this.notifier);
    }
  }
 
  async connect()
  {
    if (this.notifier)
    {
      await this.notifier.connect();
    }
    await super.connect();
  }
 
  /** @override */
  readAllRecords()
  {
    const that = this;
    return new Promise((resolve, reject) =>
    {
      let items = [];
 
      function scan(LastEvaluatedKey = undefined)
      {
        const request = {
          TableName: that.collectionName,
        }
        Iif (LastEvaluatedKey)
        {
          request.ExclusiveStartKey = LastEvaluatedKey;
        }
 
        that.db.scan(request, (err, data) =>
        {
          Iif (err)
          {
            reject(err);
          }
          else
          {
            items = items.concat(data.Items)
            Iif (data.LastEvaluatedKey)
            {
              scan(data.LastEvaluatedKey);
            }
            else
            {
              items = items.map(record =>
              {
                return dynamoDecodeRecord(record, that.primaryKey);
              });
              resolve(items);
            }
          }
        });
      }
      scan();
    });
  }
 
  /** @override */
  createRecord(record)
  {
    return this.updateRecord(record);
  }
 
  /** @override */
  async readRecord(record)
  {
    const query = {
      TableName: this.collectionName,
      Key: {}
    };
    query.Key[this.primaryKey] = {
      "S": record[this.primaryKey]
    };
    let data = await this.db.getItem(query)
      .promise();
 
    data = data.Item;
    // if (!data)
    // {
    //   throw new Error('not found');
    // }
    return dynamoDecodeRecord(data, this.primaryKey);
  }
 
  /** @override */
  async updateRecord(record)
  {
    await this.db.putItem({
        TableName: this.collectionName,
        Item: dynamoEncodeRecord(record, this.primaryKey)
      })
      .promise();
    return record;
  }
 
  /** @override */
  async deleteRecord(record)
  {
    const query = {
      TableName: this.collectionName,
      Key: {}
    };
    query.Key[this.primaryKey] = {
      "S": record[this.primaryKey]
    };
    return await this.db.deleteItem(query)
      .promise();
  }
}
 
module.exports = DynamoStorage;