1 | const Url = require('url');
|
2 | const Transport = require('./Transport');
|
3 | const Transports = require('./Transports');
|
4 | const errors = require('./Errors');
|
5 | const canonicaljson = require('@stratumn/canonicaljson');
|
6 |
|
7 |
|
8 | const debug = require('debug')('dweb-transports:fluence');
|
9 |
|
10 | const defaultOptions = {
|
11 | nodeUrl: 'https://ia-redis.fluence.one',
|
12 | nodePort: 443,
|
13 | appId: '4'
|
14 | };
|
15 |
|
16 | class TransportFLUENCE extends Transport {
|
17 |
|
18 | constructor(options) {
|
19 | super(options);
|
20 | this.options = options;
|
21 | this.session = undefined;
|
22 | this.name = 'FLUENCE';
|
23 | this.supportURLs = ['fluence'];
|
24 | this.supportFunctions = [
|
25 |
|
26 | 'fetch',
|
27 |
|
28 |
|
29 | 'list',
|
30 | 'add',
|
31 | 'newlisturls',
|
32 |
|
33 |
|
34 | 'newdatabase',
|
35 | 'newtable',
|
36 | 'get',
|
37 | 'set',
|
38 | 'getall',
|
39 | 'keys',
|
40 | ];
|
41 | this.supportFeatures = [];
|
42 | this.status = Transport.STATUS_LOADED;
|
43 | }
|
44 |
|
45 | static setup0(options) {
|
46 | const combinedOptions = Transport.mergeoptions(defaultOptions, options.fluence);
|
47 |
|
48 | console.assert(combinedOptions.nodeUrl, 'Fluence Node url should be specified');
|
49 | console.assert(combinedOptions.nodePort, 'Fluence Node port should be specified');
|
50 | console.assert(combinedOptions.appId, 'Fluence AppId should be specified');
|
51 |
|
52 | let t = new TransportFLUENCE(combinedOptions);
|
53 | Transports.addtransport(t);
|
54 | return t;
|
55 | }
|
56 |
|
57 | async p_setup1(cb) {
|
58 | try {
|
59 | this.status = Transport.STATUS_STARTING;
|
60 | debug('connecting...');
|
61 |
|
62 | if (cb) cb(this);
|
63 |
|
64 | const rndString = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
|
65 | this.session = fluence.directConnect(this.options.nodeUrl, this.options.nodePort, this.options.appId, rndString);
|
66 |
|
67 | debug('connected.');
|
68 |
|
69 | this.status = Transport.STATUS_CONNECTED;
|
70 | } catch (err) {
|
71 | console.error(this.name, 'failed to start', err);
|
72 | this.status = Transport.STATUS_FAILED;
|
73 | }
|
74 |
|
75 | if (cb) cb(this);
|
76 | return this;
|
77 | }
|
78 |
|
79 |
|
80 | async p_status() {
|
81 | if (this.session !== null && this.session !== undefined) {
|
82 | this.status = Transport.STATUS_CONNECTED;
|
83 | }
|
84 |
|
85 | return super.p_status();
|
86 | }
|
87 |
|
88 | parseUrl(url) {
|
89 | const parsedUrl = Url.parse(url);
|
90 | if (parsedUrl.protocol !== 'fluence:') {
|
91 | throw new errors.TransportError(`TransportFLUENCE Error encountered retrieving val: url (${parsedUrl.href}) is not a valid FLUENCE url | protocol = ${parsedUrl.protocol}`);
|
92 | }
|
93 |
|
94 | debug('retrieve url', parsedUrl.href);
|
95 |
|
96 | return parsedUrl;
|
97 | }
|
98 |
|
99 | newKey(pubkey) {
|
100 | if (pubkey.hasOwnProperty("keypair")) {
|
101 | pubkey = pubkey.keypair.signingexport();
|
102 | }
|
103 |
|
104 |
|
105 | return `fluence:/fluence/${encodeURIComponent(pubkey)}`;
|
106 | }
|
107 |
|
108 | parseRedisResponse(result) {
|
109 |
|
110 | if (result.startsWith('$-1')) {
|
111 | return undefined;
|
112 | }
|
113 |
|
114 | function parseResponsePart(result) {
|
115 | const [, type, countStr] = /^([\+\-\:\$\*]{1})([0-9]+)/.exec(result);
|
116 | const count = Number(countStr);
|
117 |
|
118 | switch (type) {
|
119 | case '+': {
|
120 | const offset = 1;
|
121 | const [, data] = /([^\n\r]+)/.exec(result.substring(offset));
|
122 | return {
|
123 | data: data,
|
124 | offset: offset + data.length + 2
|
125 | };
|
126 | }
|
127 | case ':': {
|
128 | const offset = 1;
|
129 | const [, data] = /([0-9]+)/.exec(result.substring(offset));
|
130 | return {
|
131 | data: Number(data),
|
132 | offset: offset + data.length + 2
|
133 | };
|
134 | }
|
135 | case '$': {
|
136 | const offset = 1 + String(count).length + 2;
|
137 | return {
|
138 | data: result.substring(offset, offset + count),
|
139 | offset: offset + count + 2
|
140 | };
|
141 | }
|
142 | case '*': {
|
143 | let offset = 1 + String(count).length + 2;
|
144 | const list = [];
|
145 | for(let i = 0;i < count;i++) {
|
146 | const parsedListItem = parseResponsePart(result.substring(offset));
|
147 | list.push(parsedListItem.data);
|
148 | offset += parsedListItem.offset;
|
149 | }
|
150 |
|
151 | return {
|
152 | data: list,
|
153 | offset: null
|
154 | };
|
155 | }
|
156 | default: {
|
157 | throw new errors.TransportError(`TransportFLUENCE Error unsupprted Redis response type: ${type}, response: ${result}`);
|
158 | }
|
159 | }
|
160 | }
|
161 |
|
162 | return parseResponsePart(result).data;
|
163 | }
|
164 |
|
165 |
|
166 |
|
167 | async p_rawfetch(url) {
|
168 | const parsedUrl = this.parseUrl(url);
|
169 | const key = parsedUrl.path;
|
170 |
|
171 | const result = await this.session.request(`GET ${key}`);
|
172 | const data = this.parseRedisResponse(result.asString());
|
173 |
|
174 | if (!data) {
|
175 | throw new errors.TransportError(`TransportFLUENCE unable to retrieve: ${url.href}`);
|
176 | }
|
177 |
|
178 | return typeof data === 'string' ? JSON.parse(data) : data;
|
179 | }
|
180 |
|
181 |
|
182 |
|
183 | async p_rawlist(url) {
|
184 | const parsedUrl = this.parseUrl(url);
|
185 | const key = parsedUrl.path;
|
186 |
|
187 | const result = await this.session.request(`LRANGE ${key} 0 -1`);
|
188 | const data = this.parseRedisResponse(result.asString());
|
189 |
|
190 | if (!data) {
|
191 | throw new errors.TransportError(`TransportFLUENCE unable to retrieve list: ${url.href}`);
|
192 | }
|
193 |
|
194 | return data.map(listItem => typeof listItem === 'string' ? JSON.parse(listItem) : listItem);
|
195 | }
|
196 |
|
197 | async p_rawadd(url, sig) {
|
198 | const parsedUrl = this.parseUrl(url);
|
199 | const key = parsedUrl.path;
|
200 |
|
201 | const data = canonicaljson.stringify( sig.preflight( Object.assign({}, sig)));
|
202 |
|
203 | await this.session.request(`RPUSH ${key} ${data}`);
|
204 | }
|
205 |
|
206 | async p_newlisturls(cl) {
|
207 | const key = this.newKey(cl);
|
208 |
|
209 | return [key, key];
|
210 | }
|
211 |
|
212 |
|
213 |
|
214 | async p_newdatabase(pubkey) {
|
215 | |
216 |
|
217 |
|
218 |
|
219 | let key = await this.newKey(pubkey);
|
220 | return {
|
221 | publicurl: key,
|
222 | privateurl: key
|
223 | };
|
224 | }
|
225 |
|
226 | async p_newtable(pubkey, table) {
|
227 | |
228 |
|
229 |
|
230 |
|
231 | if (!pubkey) {
|
232 | throw new errors.CodingError("p_newtable currently requires a pubkey");
|
233 | }
|
234 |
|
235 | const { publicurl, privateurl } = await this.p_newdatabase(pubkey);
|
236 | return {
|
237 | privateurl: `${privateurl}/${table}`,
|
238 | publicurl: `${publicurl}/${table}`
|
239 | };
|
240 | }
|
241 |
|
242 | async p_set(url, keyvalues, value) {
|
243 | |
244 |
|
245 |
|
246 |
|
247 |
|
248 |
|
249 | if (typeof keyvalues === 'string') {
|
250 | await this.session.request(`HSET ${url} ${keyvalues} ${canonicaljson.stringify(value)}`);
|
251 | } else {
|
252 |
|
253 | console.assert(!Array.isArray(keyvalues), 'TransportFLUENCE - shouldnt pass an array as the keyvalues');
|
254 |
|
255 | await Promise.all(
|
256 | Object.keys(keyvalues).map(hKey => this.session.request(`HSET ${url} ${hKey} ${canonicaljson.stringify(keyvalues[hKey])}`))
|
257 | );
|
258 | }
|
259 | }
|
260 |
|
261 | async p_get(url, keys) {
|
262 | if (Array.isArray(keys)) {
|
263 | const result = await this.session.request(`HMGET ${url} ${keys.join(' ')}`);
|
264 | const data = this.parseRedisResponse(result.asString());
|
265 |
|
266 | return keys.reduce((store, key, index) => {
|
267 | const keyValue = data[index];
|
268 | store[key] = typeof keyValue === "string" ? JSON.parse(keyValue) : keyValue;
|
269 |
|
270 | return store;
|
271 | }, {});
|
272 | } else {
|
273 | const result = await this.session.request(`HGET ${url} ${keys}`);
|
274 | let data = this.parseRedisResponse(result.asString());
|
275 |
|
276 | return typeof data === 'string' ? JSON.parse(data) : data;
|
277 | }
|
278 | }
|
279 |
|
280 | async p_keys(url) {
|
281 | const result = await this.session.request(`HKEYS ${url}`);
|
282 |
|
283 | return this.parseRedisResponse(result.asString());
|
284 | }
|
285 |
|
286 | async p_getall(url) {
|
287 | const result = await this.session.request(`HGETALL ${url}`);
|
288 | const dataArray = this.parseRedisResponse(result.asString());
|
289 |
|
290 | return dataArray.reduce((store, key, index, dataArray) => {
|
291 | if (index % 2 !== 0) {
|
292 | return store;
|
293 | }
|
294 |
|
295 | const keyValue = dataArray[index + 1];
|
296 | store[key] = typeof keyValue === "string" ? JSON.parse(keyValue) : keyValue;
|
297 |
|
298 | return store;
|
299 | }, {});
|
300 | }
|
301 |
|
302 | async p_delete(url, keys) {
|
303 | if (typeof keys === "string") {
|
304 | await this.session.request(`HDEL ${url} ${keys}`);
|
305 | } else {
|
306 | await this.session.request(`HDEL ${url} ${keys.join(' ')}`);
|
307 | }
|
308 | }
|
309 | }
|
310 |
|
311 | Transports._transportclasses['FLUENCE'] = TransportFLUENCE;
|
312 | TransportFLUENCE.scripts = ["fluence@0.3.14-no-webj/bundle/bundle.js"];
|
313 | TransportFLUENCE.requires = {"fluence": "fluence"};
|
314 |
|
315 | exports = module.exports = TransportFLUENCE;
|