1 | //Note if now that its included by the consumer (nodejs or html) its unclear if this requires a new Y and then Y.extend("y-text",..." but YJS is no longer maintained
|
2 | /*
|
3 | This Transport layers builds on the YJS DB and uses IPFS as its transport.
|
4 |
|
5 | Y Lists have listeners and generate events - see docs at ...
|
6 | */
|
7 | const Url = require('url');
|
8 | const debugyjs = require('debug')('dweb-transports:yjs');
|
9 | const canonicaljson = require('@stratumn/canonicaljson');
|
10 |
|
11 | //const Y = require('yjs/dist/y.js'); // Explicity require of dist/y.js to get around a webpack warning but causes different error in YJS
|
12 | /* See TransportYJS.requires at bottom
|
13 | const Y = require('yjs');
|
14 | require('y-memory')(Y);
|
15 | require('y-array')(Y);
|
16 | require('y-text')(Y);
|
17 | require('y-map')(Y);
|
18 | require('y-ipfs-connector')(Y);
|
19 | require('y-indexeddb')(Y);
|
20 | //require('y-leveldb')(Y); //- can't be there for browser, node seems to find it ok without this, though not sure why..
|
21 | */
|
22 | // Utility packages (ours) And one-liners
|
23 | function delay(ms, val) { return new Promise(resolve => {setTimeout(() => { resolve(val); },ms)})}
|
24 |
|
25 | // Other Dweb modules
|
26 | const errors = require('./Errors'); // Standard Dweb Errors
|
27 | const Transport = require('./Transport.js'); // Base class for TransportXyz
|
28 | const Transports = require('./Transports'); // Manage all Transports that are loaded
|
29 | const utils = require('./utils'); // Utility functions
|
30 |
|
31 | let defaultoptions = {
|
32 | db: {
|
33 | name: 'indexeddb', // leveldb in node
|
34 | },
|
35 | connector: {
|
36 | name: 'ipfs',
|
37 | //ipfs: ipfs, // Need to link IPFS here once created
|
38 | },
|
39 | };
|
40 |
|
41 | class TransportYJS extends Transport {
|
42 | /*
|
43 | YJS specific transport - over IPFS, but could probably use other YJS transports
|
44 |
|
45 | Fields: TODO document this
|
46 | */
|
47 |
|
48 | constructor(options) {
|
49 | super(options);
|
50 | this.options = options; // Dictionary of options
|
51 | this.name = "YJS"; // For console log etc
|
52 | this.supportURLs = ['yjs'];
|
53 | this.supportFunctions = ['fetch', 'add', 'list', 'listmonitor', 'newlisturls',
|
54 | 'connection', 'get', 'set', 'getall', 'keys', 'newdatabase', 'newtable', 'monitor']; // Only does list functions, Does not support reverse,
|
55 | this.supportFeatures = []; // Doesnt support noCache and is mutable
|
56 | this.status = Transport.STATUS_LOADED;
|
57 | }
|
58 |
|
59 | async p__y(url, opts) {
|
60 | /*
|
61 | Utility function to get Y for this URL with appropriate options and open a new connection if not already
|
62 |
|
63 | url: URL string to find list of
|
64 | opts: Options to add to defaults
|
65 | resolves: Y
|
66 | */
|
67 | if (!(typeof(url) === "string")) { url = url.href; } // Convert if its a parsed URL
|
68 | console.assert(url.startsWith("yjs:/yjs/"));
|
69 | try {
|
70 | if (this.yarrays[url]) {
|
71 | //debugyjs("Found Y for %s", url);
|
72 | return this.yarrays[url];
|
73 | } else {
|
74 | let options = Transport.mergeoptions(this.options, {connector: {room: url}}, opts); // Copies options, ipfs will be set already
|
75 | //debugyjs("Creating Y for %s", url);
|
76 | return this.yarrays[url] = await Y(options);
|
77 | }
|
78 | } catch(err) {
|
79 | console.error("Failed to initialize Y", err.message);
|
80 | throw err;
|
81 | }
|
82 | }
|
83 |
|
84 | async p__yarray(url) {
|
85 | /*
|
86 | Utility function to get Yarray for this URL and open a new connection if not already
|
87 | url: URL string to find list of
|
88 | resolves: Y
|
89 | */
|
90 | return this.p__y(url, { share: {array: "Array"}}); // Copies options, ipfs will be set already
|
91 | }
|
92 | async p_connection(url) {
|
93 | /*
|
94 | Utility function to get Yarray for this URL and open a new connection if not already
|
95 | url: URL string to find list of
|
96 | resolves: Y - a connection to use for get's etc.
|
97 | */
|
98 | return this.p__y(url, { share: {map: "Map"}}); // Copies options, ipfs will be set already
|
99 | }
|
100 |
|
101 | //TODO-SPLIT define load()
|
102 |
|
103 | static setup0(options) {
|
104 | /*
|
105 | First part of setup, create obj, add to Transports but dont attempt to connect, typically called instead of p_setup if want to parallelize connections.
|
106 | */
|
107 | let combinedoptions = Transport.mergeoptions(defaultoptions, options.yjs);
|
108 | debugyjs("YJS options %o", combinedoptions);
|
109 | let t = new TransportYJS(combinedoptions); // Note doesnt start IPFS or Y
|
110 | Transports.addtransport(t);
|
111 | return t;
|
112 | }
|
113 |
|
114 | async p_setup2(cb) {
|
115 | /*
|
116 | This sets up for Y connections, which are opened each time a resource is listed, added to, or listmonitored.
|
117 | p_setup2 is defined because IPFS will have started during the p_setup1 phase.
|
118 | Throws: Error("websocket error") if WiFi off, probably other errors if fails to connect
|
119 | */
|
120 | try {
|
121 | this.status = Transport.STATUS_STARTING; // Should display, but probably not refreshed in most case
|
122 | if (cb) cb(this);
|
123 | this.options.connector.ipfs = Transports.ipfs().ipfs; // Find an IPFS to use (IPFS's should be starting in p_setup1)
|
124 | this.yarrays = {};
|
125 | await this.p_status();
|
126 | } catch(err) {
|
127 | console.error(this.name,"failed to start",err);
|
128 | this.status = Transport.STATUS_FAILED;
|
129 | }
|
130 | if (cb) cb(this);
|
131 | return this;
|
132 | }
|
133 |
|
134 | async p_status() {
|
135 | /*
|
136 | Return a string for the status of a transport. No particular format, but keep it short as it will probably be in a small area of the screen.
|
137 | For YJS, its online if IPFS is.
|
138 | */
|
139 | this.status = (await this.options.connector.ipfs.isOnline()) ? Transport.STATUS_CONNECTED : Transport.STATUS_FAILED;
|
140 | return super.p_status();
|
141 | }
|
142 |
|
143 | // ======= LISTS ========
|
144 |
|
145 | async p_rawlist(url) {
|
146 | /*
|
147 | Fetch all the objects in a list, these are identified by the url of the public key used for signing.
|
148 | (Note this is the 'signedby' parameter of the p_rawadd call, not the 'url' parameter
|
149 | Returns a promise that resolves to the list.
|
150 | Each item of the list is a dict: {"url": url, "date": date, "signature": signature, "signedby": signedby}
|
151 | List items may have other data (e.g. reference ids of underlying transport)
|
152 |
|
153 | :param string url: String with the url that identifies the list.
|
154 | :resolve array: An array of objects as stored on the list.
|
155 | */
|
156 | try {
|
157 | let y = await this.p__yarray(url);
|
158 | let res = y.share.array.toArray();
|
159 | // .filter((obj) => (obj.signedby.includes(url))); Cant filter since url is the YJS URL, not the URL of the CL that signed it. (upper layers verify, which filters)
|
160 | //Logged by Transports
|
161 | //debugyjs("p_rawlist found %o", res);
|
162 | return res;
|
163 | } catch(err) {
|
164 | //Logged by Transports
|
165 | // console.log("TransportYJS.p_rawlist failed",err.message);
|
166 | throw(err);
|
167 | }
|
168 | }
|
169 |
|
170 | listmonitor(url, callback, {current=false}={}) {
|
171 | /*
|
172 | Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_rawlist to get any more items not returned by p_rawlist.
|
173 |
|
174 | :param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
|
175 | :param callback: function(obj) Callback for each new item added to the list
|
176 | obj is same format as p_rawlist or p_rawreverse
|
177 | */
|
178 | let y = this.yarrays[typeof url === "string" ? url : url.href];
|
179 | console.assert(y,"Should always exist before calling listmonitor - async call p__yarray(url) to create");
|
180 | if (current) {
|
181 | y.share.array.toArray.map(callback);
|
182 | }
|
183 | y.share.array.observe((event) => {
|
184 | if (event.type === 'insert') { // Currently ignoring deletions.
|
185 | debugyjs('resources inserted %o', event.values);
|
186 | //cant filter because url is YJS local, not signer, callback should filter
|
187 | //event.values.filter((obj) => obj.signedby.includes(url)).map(callback);
|
188 | event.values.map(callback);
|
189 | }
|
190 | })
|
191 | }
|
192 |
|
193 | rawreverse() {
|
194 | /*
|
195 | Similar to p_rawlist, but return the list item of all the places where the object url has been listed.
|
196 | The url here corresponds to the "url" parameter of p_rawadd
|
197 | Returns a promise that resolves to the list.
|
198 |
|
199 | :param string url: String with the url that identifies the object put on a list.
|
200 | :resolve array: An array of objects as stored on the list.
|
201 | */
|
202 | //TODO-REVERSE this needs implementing once list structure on IPFS more certain
|
203 | throw new errors.ToBeImplementedError("Undefined function TransportYJS.rawreverse"); }
|
204 |
|
205 | async p_rawadd(url, sig) {
|
206 | /*
|
207 | Store a new list item, it should be stored so that it can be retrieved either by "signedby" (using p_rawlist) or
|
208 | by "url" (with p_rawreverse). The underlying transport does not need to guarantee the signature,
|
209 | an invalid item on a list should be rejected on higher layers.
|
210 |
|
211 | :param string url: String identifying list to post to
|
212 | :param Signature sig: Signature object containing at least:
|
213 | date - date of signing in ISO format,
|
214 | urls - array of urls for the object being signed
|
215 | signature - verifiable signature of date+urls
|
216 | signedby - urls of public key used for the signature
|
217 | :resolve undefined:
|
218 | */
|
219 | // Logged by Transports
|
220 | //debugyjs("TransportYJS.p_rawadd %o %o", url.href, sig);
|
221 | console.assert(url && sig.urls.length && sig.signature && sig.signedby.length, "TransportYJS.p_rawadd args", url, sig);
|
222 | let value = sig.preflight(Object.assign({}, sig));
|
223 | let y = await this.p__yarray(url);
|
224 | y.share.array.push([value]);
|
225 | }
|
226 |
|
227 | p_newlisturls(cl) {
|
228 | let u = cl._publicurls.map(urlstr => Url.parse(urlstr))
|
229 | .find(parsedurl =>
|
230 | (parsedurl.protocol === "ipfs" && parsedurl.pathname.includes('/ipfs/'))
|
231 | || (parsedurl.protocol === "yjs:"));
|
232 | if (!u) {
|
233 | u = `yjs:/yjs/${ cl.keypair.verifyexportmultihashsha256_58() }`; // Pretty random, but means same test will generate same list
|
234 | }
|
235 | return [u,u];
|
236 | }
|
237 |
|
238 | // ======= KEY VALUE TABLES ========
|
239 |
|
240 | async p_newdatabase(pubkey) {
|
241 | //if (pubkey instanceof Dweb.PublicPrivate)
|
242 | if (pubkey.hasOwnProperty("keypair"))
|
243 | pubkey = pubkey.keypair.signingexport();
|
244 | // By this point pubkey should be an export of a public key of form xyz:abc where xyz
|
245 | // specifies the type of public key (NACL VERIFY being the only kind we expect currently)
|
246 | let u = `yjs:/yjs/${encodeURIComponent(pubkey)}`;
|
247 | return {"publicurl": u, "privateurl": u};
|
248 | }
|
249 |
|
250 | //TODO maybe change the listmonitor / monitor code for to use "on" and the structure of PP.events
|
251 | //TODO but note https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy about Proxy which might be suitable, prob not as doesnt map well to lists
|
252 | async p_newtable(pubkey, table) {
|
253 | if (!pubkey) throw new errors.CodingError("p_newtable currently requires a pubkey");
|
254 | let database = await this.p_newdatabase(pubkey);
|
255 | // If have use cases without a database, then call p_newdatabase first
|
256 | return { privateurl: `${database.privateurl}/${table}`, publicurl: `${database.publicurl}/${table}`} // No action required to create it
|
257 | }
|
258 |
|
259 | async p_set(url, keyvalues, value) { // url = yjs:/yjs/database/table
|
260 | /*
|
261 | Set key values
|
262 | keyvalues: string (key) in which case value should be set there OR
|
263 | object in which case value is ignored
|
264 | */
|
265 | let y = await this.p_connection(url);
|
266 | if (typeof keyvalues === "string") {
|
267 | y.share.map.set(keyvalues, canonicaljson.stringify(value));
|
268 | } else {
|
269 | Object.keys(keyvalues).map((key) => y.share.map.set(key, keyvalues[key]));
|
270 | }
|
271 | }
|
272 | _p_get(y, keys) {
|
273 | if (Array.isArray(keys)) {
|
274 | return keys.reduce(function(previous, key) {
|
275 | let val = y.share.map.get(key);
|
276 | previous[key] = typeof val === "string" ? JSON.parse(val) : val; // Handle undefined
|
277 | return previous;
|
278 | }, {});
|
279 | } else {
|
280 | let val = y.share.map.get(keys);
|
281 | return typeof val === "string" ? JSON.parse(val) : val; // Surprisingly this is sync, the p_connection should have synchronised
|
282 | }
|
283 | }
|
284 | async p_get(url, keys) {
|
285 | return this._p_get(await this.p_connection(url), keys);
|
286 | }
|
287 |
|
288 | async p_delete(url, keys) {
|
289 | let y = await this.p_connection(url);
|
290 | if (typeof keys === "string") {
|
291 | y.share.map.delete(keys);
|
292 | } else {
|
293 | keys.map((key) => y.share.map.delete(key)); // Surprisingly this is sync, the p_connection should have synchronised
|
294 | }
|
295 | }
|
296 |
|
297 | async p_keys(url) {
|
298 | let y = await this.p_connection(url);
|
299 | return y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
|
300 | }
|
301 | async p_getall(url) {
|
302 | let y = await this.p_connection(url);
|
303 | let keys = y.share.map.keys(); // Surprisingly this is sync, the p_connection should have synchronised
|
304 | return this._p_get(y, keys);
|
305 | }
|
306 | async p_rawfetch(url) {
|
307 | return { // See identical structure in TransportHTTP
|
308 | table: "keyvaluetable", //TODO-KEYVALUE its unclear if this is the best way, as maybe want to know the real type of table e.g. domain
|
309 | _map: await this.p_getall(url)
|
310 | }; // Data struc is ok as SmartDict.p_fetch will pass to KVT constructor
|
311 | }
|
312 | async monitor(url, callback, {current=false}={}) {
|
313 | /*
|
314 | Setup a callback called whenever an item is added to a list, typically it would be called immediately after a p_getall to get any more items not returned by p_getall.
|
315 | Stack: KVT()|KVT.p_new => KVT.monitor => (a: Transports.monitor => YJS.monitor)(b: dispatchEvent)
|
316 |
|
317 | :param url: string Identifier of list (as used by p_rawlist and "signedby" parameter of p_rawadd
|
318 | :param callback: function({type, key, value}) Callback for each new item added to the list
|
319 |
|
320 | :param current: boolean - true if want events for current items on table
|
321 | */
|
322 | url = typeof url === "string" ? url : url.href;
|
323 | let y = this.yarrays[url];
|
324 | if (!y) {
|
325 | throw new errors.CodingError("Should always exist before calling monitor - async call p__yarray(url) to create");
|
326 | }
|
327 | if (current) {
|
328 | // Iterate over existing items with callback
|
329 | y.share.map.keys()
|
330 | .forEach(k => {
|
331 | let val = y.share.map.get[k];
|
332 | callback({type: "set", key: k, value: (typeof val === "string" ? JSON.parse(val) : val)});
|
333 | })
|
334 | }
|
335 | y.share.map.observe((event) => {
|
336 | if (['add','update'].includes(event.type)) { // Currently ignoring deletions.
|
337 | debugyjs("YJS monitor: %o %s %s %o", url, event.type, event.name, event.value);
|
338 | // ignores event.path (only in observeDeep) and event.object
|
339 | if (!(event.type === "update" && event.oldValue === event.value)) {
|
340 | // Dont trigger on update as seeing some loops with p_set
|
341 | let newevent = {
|
342 | "type": {"add": "set", "update": "set", "delete": "delete"}[event.type],
|
343 | "value": JSON.parse(event.value),
|
344 | "key": event.name,
|
345 | };
|
346 | callback(newevent);
|
347 | }
|
348 | }
|
349 | })
|
350 | }
|
351 |
|
352 | static async p_test(opts={}) {
|
353 | {console.log("TransportHTTP.test")}
|
354 | try {
|
355 | let transport = await this.p_setup(opts);
|
356 | console.log("HTTP connected");
|
357 | let res = await transport.p_info();
|
358 | console.log("TransportHTTP info=",res);
|
359 | res = await transport.p_status();
|
360 | console.assert(res === Transport.STATUS_CONNECTED);
|
361 | await transport.p_test_kvt("NACL%20VERIFY");
|
362 | } catch(err) {
|
363 | console.log("Exception thrown in TransportHTTP.test:", err.message);
|
364 | throw err;
|
365 | }
|
366 | }
|
367 |
|
368 |
|
369 |
|
370 | }
|
371 | //TransportYJS.Y = Y; // Allow node tests to find it
|
372 | Transports._transportclasses["YJS"] = TransportYJS;
|
373 | TransportYJS.requires = ["yjs", "y-array", "y-memory", "y-text", "y-map", "y-ipfs-connector", "y-indexeddb"]; //~130KB
|
374 | // If start using YJS again, it needs something like Y = new yjs; Y.extend(y-array, y-memory ... - see yjs/README.md
|
375 | TransportYJS.scripts = ["yjs/dist/y.js", "y-array/dist/y-array.js", "y-memory/dist/y-memory.js",
|
376 | "y-text/dist/y-text.js", "y-map/dist/y-map.js", "y-ipfs-connector/dist/y-ipfs-connector", "y-indexeddb/dist/y-indexeddb"]; //~130KB
|