All files / node-collections-boilerplate-nahid/storage DynamoStorageStreamNotifier.js

91.89% Statements 34/37
71.43% Branches 10/14
100% Functions 3/3
91.89% Lines 34/37
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 1411x   1x                     19x 19x                 19x         19x       19x     19x   19x 19x           19x   19x 19x 19x     19x             19x         23x     2x           2x   2x 2x                         23x       23x   205x     203x 203x   1x 1x   1x 1x         23x 23x   8x         1x                                              
const AWS = require('aws-sdk');
 
const dynamoDecodeRecord = require('./dynamoDecodeRecord');
 
/**
 * Implmentation of streams notification handler.
 *
 * Do not use directly. see useStreams option in DynamodbStorage.
 */
class DynamoStorageStreamNotifier
{
  constructor(storage)
  {
    this.storage = storage;
    this.stream = new AWS.DynamoDBStreams({
      apiVersion: '2012-08-10',
      region: storage.region
    });
  }
 
  async connect()
  {
    // extract StreamArn with ListStreams
    const streams = (await this.stream.listStreams({
          TableName: this.storage.collectionName
        })
        .promise())
      .Streams || [];
    Iif (streams.length === 0)
    {
      throw new Error(`Stream is not found for table '${this.storage.collectionName}'.`);
    }
    this.streamArn = streams[0].StreamArn;
 
    // find the latest shard id
    let lastEvaluatedShardId = undefined;
    let stream;
    do {
      stream = (await this.stream.describeStream({
            StreamArn: this.streamArn,
            ExclusiveStartShardId: lastEvaluatedShardId
          })
          .promise())
        .StreamDescription;
      lastEvaluatedShardId = stream.LastEvaluatedShardId;
    } while (stream.LastEvaluatedShardId);
    let shards = stream.Shards;
    let shard = shards[shards.length - 1];
    this.shardId = shard.ShardId;
 
    // find the latest shard iterator
    let iterator = await this.stream.getShardIterator({
        StreamArn: this.streamArn,
        ShardId: this.shardId,
        ShardIteratorType: 'LATEST'
        //ShardIteratorType: 'TRIM_HORIZON'
      })
      .promise();
    this.shardIterator = iterator.ShardIterator;
  }
 
  async updateCheck()
  {
    if (!this.shardIterator)
    {
      // get next shard iterator
      const shards = (await this.stream.describeStream({
            StreamArn: this.streamArn,
            ExclusiveStartShardId: this.shardId
          })
          .promise())
        .StreamDescription.Shards;
      Eif (shards.length > 0)
      {
        this.shardId = shards[0].ShardId;
        this.shardIterator = (await this.stream.getShardIterator({
              StreamArn: this.streamArn,
              ShardId: this.shardId,
              ShardIteratorType: 'TRIM_HORIZON'
            })
            .promise())
          .ShardIterator;
      }
      else
      {
        return;
      }
    }
    const records = await this.stream.getRecords({
        ShardIterator: this.shardIterator
      })
      .promise();
    for (let record of records.Records)
    {
      switch (record.eventName)
      {
      case 'INSERT':
        this.storage.emit('create', dynamoDecodeRecord(record.dynamodb.NewImage, this.storage.primaryKey));
        break;
      case 'MODIFY':
        this.storage.emit('update', dynamoDecodeRecord(record.dynamodb.NewImage, this.storage.primaryKey));
        break;
      case 'REMOVE':
        this.storage.emit('delete', dynamoDecodeRecord(record.dynamodb.Keys, this.storage.primaryKey));
        break;
      default:
        console.log('[ERROR] TODO: process', record);
      }
    }
    this.shardIterator = records.NextShardIterator;
    if (records.Records.length > 0)
    {
      await this.updateCheck();
    }
  }
}
 
module.exports = DynamoStorageStreamNotifier;
 
/*
(async function()
{
  const notifier = new Notifier({
    region: 'ap-southeast-2',
    collectionName: 'test2',
    primaryKey: 'id',
    emit: console.log.bind(console)
  });
  await notifier.initialise();
 
  async function checkForUpdate()
  {
    await notifier.checkForUpdate();
    setTimeout(checkForUpdate, 1000)
  }
 
  checkForUpdate()
 
})();
*/