Home Reference Source

node-collections-boilerplate-nahid/storage/DynamoStorageStreamNotifier.js

const AWS = require('aws-sdk');

const dynamoDecodeRecord = require('./dynamoDecodeRecord');

/**
 * Implmentation of streams notification handler.
 *
 * Do not use directly. see useStreams option in DynamodbStorage.
 */
class DynamoStorageStreamNotifier
{
  constructor(storage)
  {
    this.storage = storage;
    this.stream = new AWS.DynamoDBStreams({
      apiVersion: '2012-08-10',
      region: storage.region
    });
  }

  async connect()
  {
    // extract StreamArn with ListStreams
    const streams = (await this.stream.listStreams({
          TableName: this.storage.collectionName
        })
        .promise())
      .Streams || [];
    if (streams.length === 0)
    {
      throw new Error(`Stream is not found for table '${this.storage.collectionName}'.`);
    }
    this.streamArn = streams[0].StreamArn;

    // find the latest shard id
    let lastEvaluatedShardId = undefined;
    let stream;
    do {
      stream = (await this.stream.describeStream({
            StreamArn: this.streamArn,
            ExclusiveStartShardId: lastEvaluatedShardId
          })
          .promise())
        .StreamDescription;
      lastEvaluatedShardId = stream.LastEvaluatedShardId;
    } while (stream.LastEvaluatedShardId);
    let shards = stream.Shards;
    let shard = shards[shards.length - 1];
    this.shardId = shard.ShardId;

    // find the latest shard iterator
    let iterator = await this.stream.getShardIterator({
        StreamArn: this.streamArn,
        ShardId: this.shardId,
        ShardIteratorType: 'LATEST'
        //ShardIteratorType: 'TRIM_HORIZON'
      })
      .promise();
    this.shardIterator = iterator.ShardIterator;
  }

  async updateCheck()
  {
    if (!this.shardIterator)
    {
      // get next shard iterator
      const shards = (await this.stream.describeStream({
            StreamArn: this.streamArn,
            ExclusiveStartShardId: this.shardId
          })
          .promise())
        .StreamDescription.Shards;
      if (shards.length > 0)
      {
        this.shardId = shards[0].ShardId;
        this.shardIterator = (await this.stream.getShardIterator({
              StreamArn: this.streamArn,
              ShardId: this.shardId,
              ShardIteratorType: 'TRIM_HORIZON'
            })
            .promise())
          .ShardIterator;
      }
      else
      {
        return;
      }
    }
    const records = await this.stream.getRecords({
        ShardIterator: this.shardIterator
      })
      .promise();
    for (let record of records.Records)
    {
      switch (record.eventName)
      {
      case 'INSERT':
        this.storage.emit('create', dynamoDecodeRecord(record.dynamodb.NewImage, this.storage.primaryKey));
        break;
      case 'MODIFY':
        this.storage.emit('update', dynamoDecodeRecord(record.dynamodb.NewImage, this.storage.primaryKey));
        break;
      case 'REMOVE':
        this.storage.emit('delete', dynamoDecodeRecord(record.dynamodb.Keys, this.storage.primaryKey));
        break;
      default:
        console.log('[ERROR] TODO: process', record);
      }
    }
    this.shardIterator = records.NextShardIterator;
    if (records.Records.length > 0)
    {
      await this.updateCheck();
    }
  }
}

module.exports = DynamoStorageStreamNotifier;

/*
(async function()
{
  const notifier = new Notifier({
    region: 'ap-southeast-2',
    collectionName: 'test2',
    primaryKey: 'id',
    emit: console.log.bind(console)
  });
  await notifier.initialise();

  async function checkForUpdate()
  {
    await notifier.checkForUpdate();
    setTimeout(checkForUpdate, 1000)
  }

  checkForUpdate()

})();
*/