1 | "use strict";
|
2 | const AWS = require("aws-sdk");
|
3 | const BbPromise = require("bluebird");
|
4 | const _ = require("lodash");
|
5 | const path = require("path");
|
6 | const fs = require("fs");
|
7 |
|
8 |
|
9 |
|
10 | const MAX_MIGRATION_CHUNK = 25;
|
11 |
|
12 |
|
13 | const MIGRATION_SEED_CONCURRENCY = 5;
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 | function writeSeedBatch(dynamodbWriteFunction, tableName, seeds) {
|
23 | const params = {
|
24 | RequestItems: {
|
25 | [tableName]: seeds.map((seed) => ({
|
26 | PutRequest: {
|
27 | Item: seed,
|
28 | },
|
29 | })),
|
30 | },
|
31 | };
|
32 | return new BbPromise((resolve, reject) => {
|
33 |
|
34 |
|
35 | let interval = 0;
|
36 | function execute(interval) {
|
37 | setTimeout(() => dynamodbWriteFunction(params, (err) => {
|
38 | if (err) {
|
39 | if (err.code === "ResourceNotFoundException" && interval <= 5000) {
|
40 | execute(interval + 1000);
|
41 | } else {
|
42 | reject(err);
|
43 | }
|
44 | } else {
|
45 | resolve();
|
46 | }
|
47 | }), interval);
|
48 | }
|
49 | execute(interval);
|
50 | });
|
51 | }
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 | function writeSeeds(dynamodbWriteFunction, tableName, seeds) {
|
60 | if (!dynamodbWriteFunction) {
|
61 | throw new Error("dynamodbWriteFunction argument must be provided");
|
62 | }
|
63 | if (!tableName) {
|
64 | throw new Error("table name argument must be provided");
|
65 | }
|
66 | if (!seeds) {
|
67 | throw new Error("seeds argument must be provided");
|
68 | }
|
69 |
|
70 | if (seeds.length > 0) {
|
71 | const seedChunks = _.chunk(seeds, MAX_MIGRATION_CHUNK);
|
72 | return BbPromise.map(
|
73 | seedChunks,
|
74 | (chunk) => writeSeedBatch(dynamodbWriteFunction, tableName, chunk),
|
75 | { concurrency: MIGRATION_SEED_CONCURRENCY }
|
76 | )
|
77 | .then(() => console.log("Seed running complete for table: " + tableName));
|
78 | }
|
79 | }
|
80 |
|
81 |
|
82 |
|
83 |
|
84 |
|
85 | function fileExists(fileName) {
|
86 | return new BbPromise((resolve) => {
|
87 | fs.exists(fileName, (exists) => resolve(exists));
|
88 | });
|
89 | }
|
90 |
|
91 |
|
92 |
|
93 |
|
94 |
|
95 |
|
96 |
|
97 | function unmarshalBuffer(json) {
|
98 | _.forEach(json, function(value, key) {
|
99 |
|
100 | if (value !== null && value.type==="Buffer") {
|
101 | json[key]= new Buffer(value.data);
|
102 | }
|
103 | });
|
104 | return json;
|
105 | }
|
106 |
|
107 |
|
108 |
|
109 |
|
110 |
|
111 |
|
112 |
|
113 |
|
114 | function getSeedsAtLocation(location) {
|
115 |
|
116 | const result = require(location);
|
117 |
|
118 |
|
119 | if (Array.isArray(result)) {
|
120 | return _.forEach(result, unmarshalBuffer);
|
121 | } else {
|
122 | return [ unmarshalBuffer(result) ];
|
123 | }
|
124 | }
|
125 |
|
126 |
|
127 |
|
128 |
|
129 |
|
130 | function locateSeeds(sources, cwd) {
|
131 | sources = sources || [];
|
132 | cwd = cwd || process.cwd();
|
133 |
|
134 | const locations = sources.map((source) => path.join(cwd, source));
|
135 | return BbPromise.map(locations, (location) => {
|
136 | return fileExists(location).then((exists) => {
|
137 | if(!exists) {
|
138 | throw new Error("source file " + location + " does not exist");
|
139 | }
|
140 | return getSeedsAtLocation(location);
|
141 | });
|
142 |
|
143 | }).then((seedArrays) => [].concat.apply([], seedArrays));
|
144 | }
|
145 |
|
146 | module.exports = { writeSeeds, locateSeeds };
|