UNPKG

17.2 kBJavaScriptView Raw
1//Note if now that its included by the consumer (nodejs or html) its unclear if this requires a new Y and then Y.extend("y-text",..." but YJS is no longer maintained
2/*
3This Transport layers builds on the YJS DB and uses IPFS as its transport.
4
5Y Lists have listeners and generate events - see docs at ...
6*/
7const Url = require('url');
8const debugyjs = require('debug')('dweb-transports:yjs');
9const canonicaljson = require('@stratumn/canonicaljson');
10
11//const Y = require('yjs/dist/y.js'); // Explicity require of dist/y.js to get around a webpack warning but causes different error in YJS
12/* See TransportYJS.requires at bottom
13const Y = require('yjs');
14require('y-memory')(Y);
15require('y-array')(Y);
16require('y-text')(Y);
17require('y-map')(Y);
18require('y-ipfs-connector')(Y);
19require('y-indexeddb')(Y);
20//require('y-leveldb')(Y); //- can't be there for browser, node seems to find it ok without this, though not sure why..
21*/
22// Utility packages (ours) And one-liners
23function delay(ms, val) { return new Promise(resolve => {setTimeout(() => { resolve(val); },ms)})}
24
25// Other Dweb modules
26const errors = require('./Errors'); // Standard Dweb Errors
27const Transport = require('./Transport.js'); // Base class for TransportXyz
28const Transports = require('./Transports'); // Manage all Transports that are loaded
29const utils = require('./utils'); // Utility functions
30
31let defaultoptions = {
32 db: {
33 name: 'indexeddb', // leveldb in node
34 },
35 connector: {
36 name: 'ipfs',
37 //ipfs: ipfs, // Need to link IPFS here once created
38 },
39};
40
41class TransportYJS extends Transport {
42 /*
43 YJS specific transport - over IPFS, but could probably use other YJS transports
44
45 Fields: TODO document this
46 */
47
48 constructor(options) {
49 super(options);
50 this.options = options; // Dictionary of options
51 this.name = "YJS"; // For console log etc
52 this.supportURLs = ['yjs'];
53 this.supportFunctions = ['fetch', 'add', 'list', 'listmonitor', 'newlisturls',
54 'connection', 'get', 'set', 'getall', 'keys', 'newdatabase', 'newtable', 'monitor']; // Only does list functions, Does not support reverse,
55 this.supportFeatures = []; // Doesnt support noCache and is mutable
56 this.status = Transport.STATUS_LOADED;
57 }
58
59 async p__y(url, opts) {
60 /*
61 Utility function to get Y for this URL with appropriate options and open a new connection if not already
62
63 url: URL string to find list of
64 opts: Options to add to defaults
65 resolves: Y
66 */
67 if (!(typeof(url) === "string")) { url = url.href; } // Convert if its a parsed URL
68 console.assert(url.startsWith("yjs:/yjs/"));
69 try {
70 if (this.yarrays[url]) {
71 //debugyjs("Found Y for %s", url);
72 return this.yarrays[url];
73 } else {
74 let options = Transport.mergeoptions(this.options, {connector: {room: url}}, opts); // Copies options, ipfs will be set already
75 //debugyjs("Creating Y for %s", url);
76 return this.yarrays[url] = await Y(options);
77 }
78 } catch(err) {
79 console.error("Failed to initialize Y", err.message);
80 throw err;
81 }
82 }
83
84 async p__yarray(url) {
85 /*
86 Utility function to get Yarray for this URL and open a new connection if not already
87 url: URL string to find list of
88 resolves: Y
89 */
90 return this.p__y(url, { share: {array: "Array"}}); // Copies options, ipfs will be set already
91 }
92 async p_connection(url) {
93 /*
94 Utility function to get Yarray for this URL and open a new connection if not already
95 url: URL string to find list of
96 resolves: Y - a connection to use for get's etc.
97 */
98 return this.p__y(url, { share: {map: "Map"}}); // Copies options, ipfs will be set already
99 }
100
101 //TODO-SPLIT define load()
102
103 static setup0(options) {
104 /*
105 First part of setup, create obj, add to Transports but dont attempt to connect, typically called instead of p_setup if want to parallelize connections.
106 */
107 let combinedoptions = Transport.mergeoptions(defaultoptions, options.yjs);
108 debugyjs("YJS options %o", combinedoptions);
109 let t = new TransportYJS(combinedoptions); // Note doesnt start IPFS or Y
110 Transports.addtransport(t);
111 return t;
112 }
113
114 async p_setup2(cb) {
115 /*
116 This sets up for Y connections, which are opened each time a resource is listed, added to, or listmonitored.
117 p_setup2 is defined because IPFS will have started during the p_setup1 phase.
118 Throws: Error("websocket error") if WiFi off, probably other errors if fails to connect
119 */
120 try {
121 this.status = Transport.STATUS_STARTING; // Should display, but probably not refreshed in most case
122 if (cb) cb(this);
123 this.options.connector.ipfs = Transports.ipfs().ipfs; // Find an IPFS to use (IPFS's should be starting in p_setup1)
124 this.yarrays = {};
125 await this.p_status();
126 } catch(err) {
127 console.error(this.name,"failed to start",err);
128 this.status = Transport.STATUS_FAILED;
129 }
130 if (cb) cb(this);
131 return this;
132 }
133
134 async p_status() {
135 /*
136 Return a string for the status of a transport. No particular format, but keep it short as it will probably be in a small area of the screen.
137 For YJS, its online if IPFS is.
138 */
139 this.status = (await this.options.connector.ipfs.isOnline()) ? Transport.STATUS_CONNECTED : Transport.STATUS_FAILED;
140 return super.p_status();
141 }
142
143 // ======= LISTS ========
144
145 async p_rawlist(url) {
146 /*
147 Fetch all the objects in a list, these are identified by the url of the public key used for signing.
148 (Note this is the 'signedby' parameter of the p_rawadd call, not the 'url' parameter
149 Returns a promise that resolves to the list.
150 Each item of the list is a dict: {"url": url, "date": date, "signature": signature, "signedby": signedby}
151 List items may have other data (e.g. reference ids of underlying transport)
152
153 :param string url: String with the url that identifies the list.
154 :resolve array: An array of objects as stored on the list.
155 */
156 try {
157 let y = await this.p__yarray(url);
158 let res = y.share.array.toArray();
159 // .filter((obj) => (obj.signedby.includes(url))); Cant filter since url is the YJS URL, not the URL of the CL that signed it. (upper layers verify, which filters)
160 //Logged by Transports
161 //debugyjs("p_rawlist found %o", res);
162 return res;
163 } catch(err) {
164 //Logged by Transports
165 // console.log("TransportYJS.p_rawlist failed",err.message);
166 throw(err);
167 }
168 }
169
170 listmonitor(url, callback, {current=false}={}) {
171 /*
172 Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_rawlist to get any more items not returned by p_rawlist.
173
174 :param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
175 :param callback: function(obj) Callback for each new item added to the list
176 obj is same format as p_rawlist or p_rawreverse
177 */
178 let y = this.yarrays[typeof url === "string" ? url : url.href];
179 console.assert(y,"Should always exist before calling listmonitor - async call p__yarray(url) to create");
180 if (current) {
181 y.share.array.toArray.map(callback);
182 }
183 y.share.array.observe((event) => {
184 if (event.type === 'insert') { // Currently ignoring deletions.
185 debugyjs('resources inserted %o', event.values);
186 //cant filter because url is YJS local, not signer, callback should filter
187 //event.values.filter((obj) => obj.signedby.includes(url)).map(callback);
188 event.values.map(callback);
189 }
190 })
191 }
192
193 rawreverse() {
194 /*
195 Similar to p_rawlist, but return the list item of all the places where the object url has been listed.
196 The url here corresponds to the "url" parameter of p_rawadd
197 Returns a promise that resolves to the list.
198
199 :param string url: String with the url that identifies the object put on a list.
200 :resolve array: An array of objects as stored on the list.
201 */
202 //TODO-REVERSE this needs implementing once list structure on IPFS more certain
203 throw new errors.ToBeImplementedError("Undefined function TransportYJS.rawreverse"); }
204
205 async p_rawadd(url, sig) {
206 /*
207 Store a new list item, it should be stored so that it can be retrieved either by "signedby" (using p_rawlist) or
208 by "url" (with p_rawreverse). The underlying transport does not need to guarantee the signature,
209 an invalid item on a list should be rejected on higher layers.
210
211 :param string url: String identifying list to post to
212 :param Signature sig: Signature object containing at least:
213 date - date of signing in ISO format,
214 urls - array of urls for the object being signed
215 signature - verifiable signature of date+urls
216 signedby - urls of public key used for the signature
217 :resolve undefined:
218 */
219 // Logged by Transports
220 //debugyjs("TransportYJS.p_rawadd %o %o", url.href, sig);
221 console.assert(url && sig.urls.length && sig.signature && sig.signedby.length, "TransportYJS.p_rawadd args", url, sig);
222 let value = sig.preflight(Object.assign({}, sig));
223 let y = await this.p__yarray(url);
224 y.share.array.push([value]);
225 }
226
227 p_newlisturls(cl) {
228 let u = cl._publicurls.map(urlstr => Url.parse(urlstr))
229 .find(parsedurl =>
230 (parsedurl.protocol === "ipfs" && parsedurl.pathname.includes('/ipfs/'))
231 || (parsedurl.protocol === "yjs:"));
232 if (!u) {
233 u = `yjs:/yjs/${ cl.keypair.verifyexportmultihashsha256_58() }`; // Pretty random, but means same test will generate same list
234 }
235 return [u,u];
236 }
237
238 // ======= KEY VALUE TABLES ========
239
240 async p_newdatabase(pubkey) {
241 //if (pubkey instanceof Dweb.PublicPrivate)
242 if (pubkey.hasOwnProperty("keypair"))
243 pubkey = pubkey.keypair.signingexport();
244 // By this point pubkey should be an export of a public key of form xyz:abc where xyz
245 // specifies the type of public key (NACL VERIFY being the only kind we expect currently)
246 let u = `yjs:/yjs/${encodeURIComponent(pubkey)}`;
247 return {"publicurl": u, "privateurl": u};
248 }
249
250 //TODO maybe change the listmonitor / monitor code for to use "on" and the structure of PP.events
251 //TODO but note https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy about Proxy which might be suitable, prob not as doesnt map well to lists
252 async p_newtable(pubkey, table) {
253 if (!pubkey) throw new errors.CodingError("p_newtable currently requires a pubkey");
254 let database = await this.p_newdatabase(pubkey);
255 // If have use cases without a database, then call p_newdatabase first
256 return { privateurl: `${database.privateurl}/${table}`, publicurl: `${database.publicurl}/${table}`} // No action required to create it
257 }
258
259 async p_set(url, keyvalues, value) { // url = yjs:/yjs/database/table
260 /*
261 Set key values
262 keyvalues: string (key) in which case value should be set there OR
263 object in which case value is ignored
264 */
265 let y = await this.p_connection(url);
266 if (typeof keyvalues === "string") {
267 y.share.map.set(keyvalues, canonicaljson.stringify(value));
268 } else {
269 Object.keys(keyvalues).map((key) => y.share.map.set(key, keyvalues[key]));
270 }
271 }
272 _p_get(y, keys) {
273 if (Array.isArray(keys)) {
274 return keys.reduce(function(previous, key) {
275 let val = y.share.map.get(key);
276 previous[key] = typeof val === "string" ? JSON.parse(val) : val; // Handle undefined
277 return previous;
278 }, {});
279 } else {
280 let val = y.share.map.get(keys);
281 return typeof val === "string" ? JSON.parse(val) : val; // Surprisingly this is sync, the p_connection should have synchronised
282 }
283 }
284 async p_get(url, keys) {
285 return this._p_get(await this.p_connection(url), keys);
286 }
287
288 async p_delete(url, keys) {
289 let y = await this.p_connection(url);
290 if (typeof keys === "string") {
291 y.share.map.delete(keys);
292 } else {
293 keys.map((key) => y.share.map.delete(key)); // Surprisingly this is sync, the p_connection should have synchronised
294 }
295 }
296
297 async p_keys(url) {
298 let y = await this.p_connection(url);
299 return y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
300 }
301 async p_getall(url) {
302 let y = await this.p_connection(url);
303 let keys = y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
304 return this._p_get(y, keys);
305 }
306 async p_rawfetch(url) {
307 return { // See identical structure in TransportHTTP
308 table: "keyvaluetable", //TODO-KEYVALUE its unclear if this is the best way, as maybe want to know the real type of table e.g. domain
309 _map: await this.p_getall(url)
310 }; // Data struc is ok as SmartDict.p_fetch will pass to KVT constructor
311 }
312 async monitor(url, callback, {current=false}={}) {
313 /*
314 Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_getall to get any more items not returned by p_getall.
315 Stack: KVT()|KVT.p_new => KVT.monitor => (a: Transports.monitor => YJS.monitor)(b: dispatchEvent)
316
317 :param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
318 :param callback: function({type, key, value}) Callback for each new item added to the list
319
320 :param current: boolean - true if want events for current items on table
321 */
322 url = typeof url === "string" ? url : url.href;
323 let y = this.yarrays[url];
324 if (!y) {
325 throw new errors.CodingError("Should always exist before calling monitor - async call p__yarray(url) to create");
326 }
327 if (current) {
328 // Iterate over existing items with callback
329 y.share.map.keys()
330 .forEach(k => {
331 let val = y.share.map.get[k];
332 callback({type: "set", key: k, value: (typeof val === "string" ? JSON.parse(val) : val)});
333 })
334 }
335 y.share.map.observe((event) => {
336 if (['add','update'].includes(event.type)) { // Currently ignoring deletions.
337 debugyjs("YJS monitor: %o %s %s %o", url, event.type, event.name, event.value);
338 // ignores event.path (only in observeDeep) and event.object
339 if (!(event.type === "update" && event.oldValue === event.value)) {
340 // Dont trigger on update as seeing some loops with p_set
341 let newevent = {
342 "type": {"add": "set", "update": "set", "delete": "delete"}[event.type],
343 "value": JSON.parse(event.value),
344 "key": event.name,
345 };
346 callback(newevent);
347 }
348 }
349 })
350 }
351
352 static async p_test(opts={}) {
353 {console.log("TransportHTTP.test")}
354 try {
355 let transport = await this.p_setup(opts);
356 console.log("HTTP connected");
357 let res = await transport.p_info();
358 console.log("TransportHTTP info=",res);
359 res = await transport.p_status();
360 console.assert(res === Transport.STATUS_CONNECTED);
361 await transport.p_test_kvt("NACL%20VERIFY");
362 } catch(err) {
363 console.log("Exception thrown in TransportHTTP.test:", err.message);
364 throw err;
365 }
366 }
367
368
369
370}
371//TransportYJS.Y = Y; // Allow node tests to find it
372Transports._transportclasses["YJS"] = TransportYJS;
373TransportYJS.requires = ["yjs", "y-array", "y-memory", "y-text", "y-map", "y-ipfs-connector", "y-indexeddb"]; //~130KB
374// If start using YJS again, it needs something like Y = new yjs; Y.extend(y-array, y-memory ... - see yjs/README.md
375TransportYJS.scripts = ["yjs/dist/y.js", "y-array/dist/y-array.js", "y-memory/dist/y-memory.js",
376 "y-text/dist/y-text.js", "y-map/dist/y-map.js", "y-ipfs-connector/dist/y-ipfs-connector", "y-indexeddb/dist/y-indexeddb"]; //~130KB