UNPKG

11.6 kBJavaScriptView Raw
1const Url = require('url');
2const Transport = require('./Transport'); // Base class for TransportXyz
3const Transports = require('./Transports'); // Manage all Transports that are loaded
4const errors = require('./Errors'); // Standard Dweb Errors
5const canonicaljson = require('@stratumn/canonicaljson');
6//const fluence = require('fluence');
7
8const debug = require('debug')('dweb-transports:fluence');
9
10const defaultOptions = {
11 nodeUrl: 'https://ia-redis.fluence.one',
12 nodePort: 443,
13 appId: '4' // Redis
14};
15
16class TransportFLUENCE extends Transport {
17
18 constructor(options) {
19 super(options);
20 this.options = options; // Dictionary of options
21 this.session = undefined;
22 this.name = 'FLUENCE'; // For console log etc
23 this.supportURLs = ['fluence'];
24 this.supportFunctions = [
25 // General data functions
26 'fetch', // p_rawfetch(url, {timeoutMS, start, end, relay}) – Fetch some bytes based on a url
27
28 // Lists functions
29 'list', // p_rawlist(url) – Fetch all the objects in a list .. identified by the url of the .. 'signedby' parameter of the p_rawadd call
30 'add', // p_rawadd(url, sig) – Store a new list item, it should be stored so that it can be retrieved either by "signedby" (using p_rawlist) or by "url" (with p_rawreverse).
31 'newlisturls', // p_newlisturls(cl) – Obtain a pair of URLs for a new list
32
33 // KeyValueTable functions
34 'newdatabase', // p_newdatabase(pubkey) – Create a new database based on some existing object
35 'newtable', // p_newtable(pubkey, table) – Create a new table
36 'get', // p_get(url, keys) – Get one or more keys from a table
37 'set', // p_set(url, keyvalues, value) – Set one or more keys in a table.
38 'getall',// p_getall(url) – Return a dictionary representing the table
39 'keys', // p_keys(url) – Return a list of keys in a table (suitable for iterating through)
40 ];
41 this.supportFeatures = [];
42 this.status = Transport.STATUS_LOADED;
43 }
44
45 static setup0(options) {
46 const combinedOptions = Transport.mergeoptions(defaultOptions, options.fluence);
47
48 console.assert(combinedOptions.nodeUrl, 'Fluence Node url should be specified');
49 console.assert(combinedOptions.nodePort, 'Fluence Node port should be specified');
50 console.assert(combinedOptions.appId, 'Fluence AppId should be specified');
51
52 let t = new TransportFLUENCE(combinedOptions);
53 Transports.addtransport(t);
54 return t;
55 }
56
57 async p_setup1(cb) {
58 try {
59 this.status = Transport.STATUS_STARTING;
60 debug('connecting...');
61
62 if (cb) cb(this);
63
64 const rndString = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
65 this.session = fluence.directConnect(this.options.nodeUrl, this.options.nodePort, this.options.appId, rndString);
66
67 debug('connected.');
68
69 this.status = Transport.STATUS_CONNECTED;
70 } catch (err) {
71 console.error(this.name, 'failed to start', err);
72 this.status = Transport.STATUS_FAILED;
73 }
74
75 if (cb) cb(this);
76 return this;
77 }
78
79
80 async p_status() {
81 if (this.session !== null && this.session !== undefined) {
82 this.status = Transport.STATUS_CONNECTED;
83 }
84
85 return super.p_status();
86 }
87
88 parseUrl(url) {
89 const parsedUrl = Url.parse(url);
90 if (parsedUrl.protocol !== 'fluence:') {
91 throw new errors.TransportError(`TransportFLUENCE Error encountered retrieving val: url (${parsedUrl.href}) is not a valid FLUENCE url | protocol = ${parsedUrl.protocol}`);
92 }
93
94 debug('retrieve url', parsedUrl.href);
95
96 return parsedUrl;
97 }
98
99 newKey(pubkey) {
100 if (pubkey.hasOwnProperty("keypair")) {
101 pubkey = pubkey.keypair.signingexport();
102 }
103 // By this point pubkey should be an export of a public key of form xyz:abc where xyz
104 // specifies the type of public key (NACL VERIFY being the only kind we expect currently)
105 return `fluence:/fluence/${encodeURIComponent(pubkey)}`;
106 }
107
108 parseRedisResponse(result) {
109
110 if (result.startsWith('$-1')) {
111 return undefined;
112 }
113
114 function parseResponsePart(result) {
115 const [, type, countStr] = /^([\+\-\:\$\*]{1})([0-9]+)/.exec(result);
116 const count = Number(countStr);
117
118 switch (type) {
119 case '+': { // Simple string
120 const offset = 1;
121 const [, data] = /([^\n\r]+)/.exec(result.substring(offset));
122 return {
123 data: data,
124 offset: offset + data.length + 2
125 };
126 }
127 case ':': { // Integer
128 const offset = 1;
129 const [, data] = /([0-9]+)/.exec(result.substring(offset));
130 return {
131 data: Number(data),
132 offset: offset + data.length + 2
133 };
134 }
135 case '$': { // Bulk string
136 const offset = 1 + String(count).length + 2;
137 return {
138 data: result.substring(offset, offset + count),
139 offset: offset + count + 2
140 };
141 }
142 case '*': { // Array
143 let offset = 1 + String(count).length + 2;
144 const list = [];
145 for(let i = 0;i < count;i++) {
146 const parsedListItem = parseResponsePart(result.substring(offset));
147 list.push(parsedListItem.data);
148 offset += parsedListItem.offset;
149 }
150
151 return {
152 data: list,
153 offset: null
154 };
155 }
156 default: {
157 throw new errors.TransportError(`TransportFLUENCE Error unsupprted Redis response type: ${type}, response: ${result}`);
158 }
159 }
160 }
161
162 return parseResponsePart(result).data;
163 }
164
165 // General data functions (uses Redis basic GET\SET)
166
167 async p_rawfetch(url) {
168 const parsedUrl = this.parseUrl(url);
169 const key = parsedUrl.path;
170
171 const result = await this.session.request(`GET ${key}`);
172 const data = this.parseRedisResponse(result.asString());
173
174 if (!data) {
175 throw new errors.TransportError(`TransportFLUENCE unable to retrieve: ${url.href}`);
176 }
177
178 return typeof data === 'string' ? JSON.parse(data) : data;
179 }
180
181 // List functions (uses Redis list)
182
183 async p_rawlist(url) {
184 const parsedUrl = this.parseUrl(url);
185 const key = parsedUrl.path;
186
187 const result = await this.session.request(`LRANGE ${key} 0 -1`);
188 const data = this.parseRedisResponse(result.asString());
189
190 if (!data) {
191 throw new errors.TransportError(`TransportFLUENCE unable to retrieve list: ${url.href}`);
192 }
193
194 return data.map(listItem => typeof listItem === 'string' ? JSON.parse(listItem) : listItem);
195 }
196
197 async p_rawadd(url, sig) {
198 const parsedUrl = this.parseUrl(url);
199 const key = parsedUrl.path;
200
201 const data = canonicaljson.stringify( sig.preflight( Object.assign({}, sig)));
202
203 await this.session.request(`RPUSH ${key} ${data}`);
204 }
205
206 async p_newlisturls(cl) {
207 const key = this.newKey(cl);
208
209 return [key, key];
210 }
211
212 // KeyValueTable functions (uses Redis hashes)
213
214 async p_newdatabase(pubkey) {
215 /*
216 Request a new database
217 returns: { publicurl: "fluence:/fluence/<publickey>", privateurl: "fluence:/fluence/<publickey>"> }
218 */
219 let key = await this.newKey(pubkey);
220 return {
221 publicurl: key,
222 privateurl: key
223 };
224 }
225
226 async p_newtable(pubkey, table) {
227 /*
228 Request a new table
229 returns: {publicurl: "fluence:/fluence/<publickey>/<table>", privateurl: "fluence:/fluence/<publickey>/<table>">
230 */
231 if (!pubkey) {
232 throw new errors.CodingError("p_newtable currently requires a pubkey");
233 }
234
235 const { publicurl, privateurl } = await this.p_newdatabase(pubkey);
236 return {
237 privateurl: `${privateurl}/${table}`,
238 publicurl: `${publicurl}/${table}`
239 };
240 }
241
242 async p_set(url, keyvalues, value) { // url = fluence:/fluence/<publickey>/<table>
243 /*
244 Set key values
245 keyvalues: string (key) in which case value should be set there OR
246 object in which case value is ignored
247 */
248
249 if (typeof keyvalues === 'string') {
250 await this.session.request(`HSET ${url} ${keyvalues} ${canonicaljson.stringify(value)}`);
251 } else {
252 // Store all key-value pairs without destroying any other key/value pairs previously set
253 console.assert(!Array.isArray(keyvalues), 'TransportFLUENCE - shouldnt pass an array as the keyvalues');
254
255 await Promise.all(
256 Object.keys(keyvalues).map(hKey => this.session.request(`HSET ${url} ${hKey} ${canonicaljson.stringify(keyvalues[hKey])}`))
257 );
258 }
259 }
260
261 async p_get(url, keys) {
262 if (Array.isArray(keys)) {
263 const result = await this.session.request(`HMGET ${url} ${keys.join(' ')}`);
264 const data = this.parseRedisResponse(result.asString());
265
266 return keys.reduce((store, key, index) => {
267 const keyValue = data[index];
268 store[key] = typeof keyValue === "string" ? JSON.parse(keyValue) : keyValue;
269
270 return store;
271 }, {});
272 } else {
273 const result = await this.session.request(`HGET ${url} ${keys}`);
274 let data = this.parseRedisResponse(result.asString());
275
276 return typeof data === 'string' ? JSON.parse(data) : data;
277 }
278 }
279
280 async p_keys(url) {
281 const result = await this.session.request(`HKEYS ${url}`);
282
283 return this.parseRedisResponse(result.asString());
284 }
285
286 async p_getall(url) {
287 const result = await this.session.request(`HGETALL ${url}`);
288 const dataArray = this.parseRedisResponse(result.asString());
289
290 return dataArray.reduce((store, key, index, dataArray) => {
291 if (index % 2 !== 0) {
292 return store;
293 }
294
295 const keyValue = dataArray[index + 1];
296 store[key] = typeof keyValue === "string" ? JSON.parse(keyValue) : keyValue;
297
298 return store;
299 }, {});
300 }
301
302 async p_delete(url, keys) {
303 if (typeof keys === "string") {
304 await this.session.request(`HDEL ${url} ${keys}`);
305 } else {
306 await this.session.request(`HDEL ${url} ${keys.join(' ')}`);
307 }
308 }
309}
310
311Transports._transportclasses['FLUENCE'] = TransportFLUENCE;
312TransportFLUENCE.scripts = ["fluence@0.3.14-no-webj/bundle/bundle.js"];
313TransportFLUENCE.requires = {"fluence": "fluence"};
314
315exports = module.exports = TransportFLUENCE;