UNPKG

9.87 kBMarkdownView Raw
1# JSON Stream
2
3Utilities for working with streaming JSON in Worker Runtimes such as Cloudflare Workers, Deno Deploy and Service Workers.
4
5***
6
7__Work in Progress__: See TODOs & Deprecations
8
9***
10
11## Base Case
12The most basic use case is turning a stream of objects into stream of strings that can be sent over the wire.
13On the other end, it can be turned back into a stream of JSON objects.
14For this `JSONStringifyStream` and `JSONParseStream` are all that is required.
15They work practically the same as `TextEncoderStream` and `TextDecoderStream`:
16
17```js
18const items = [
19 { a: 1 },
20 { b: 2},
21 { c: 3 },
22 'foo',
23 { a: { nested: { object: true }} }
24];
25const stream = toReadableStream(items)
26 .pipeThrough(new JSONStringifyStream())
27 .pipeThrough(new TextEncoderStream())
28
29// Usage e.g.:
30await fetch('/endpoint.json', {
31 body: stream,
32 method: 'POST',
33 headers: [['content-type', 'application/json']]
34})
35
36// On the server side:
37const collected = [];
38await stream
39 .pipeThrough(new JSONParseStream())
40 .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
41
42assertEquals(items, collected)
43```
44
45Note that standard JSON is used as the transport format. Unlike ND-JSON,
46neither side needs to opt-in using the streaming parser/stringifier to accept data.
47For example this is just as valid:
48
49```js
50const collected = await new Response(stream).json()
51```
52
53~~If on the other hand ND-JSON is sufficient for your use case, this module also provides `NDJSONStringifyStream` and `NDJSONParseStream` that work the same way as shown above, but lack the following features.~~ (TODO: make separate module?)
54
55## Using JSON Path to locate nested data
56__JSON Stream__ also supports more complex use cases. Assume JSON of the following structure:
57
58```jsonc
59// filename: "nested.json"
60{
61 "type": "foo",
62 "items": [
63 { "a": 1 },
64 { "b": 2 },
65 { "c": 3 },
66 // ...
67 { "zzz": 999 },
68 ]
69}
70```
71
72Here, the example code from above wouldn't work (or at least not as you would expect),
73because by default `JSONParseStream` emits the objects that are the immediate children of the root object.
74However, the constructor accepts a JSONPath-like string to locate the desired data to parse:
75
76```js
77const collected = [];
78await (await fetch('/nested.json')).body
79 .pipeThrough(new JSONParseStream('$.items.*')) // <-- new
80 .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
81```
82
83It's important to add the `.*` at the end, otherwise the entire items array will arrive in a singe call once it is fully parsed.
84
85`JSONParseStream` only supports a subset of JSONPath, specifically eval (`@`) expressions and negative slices are omitted.
86Below is a table showing some examples:
87
88| JSONPath | Description |
89|:--------------------------|:------------------------------------------------------------|
90| `$.*` | All direct children of the root. Default. |
91| `$.store.book[*].author` | The authors of all books in the store |
92| `$..author` | All authors |
93| `$.store.*` | All things in store, which are some books and a red bicycle |
94| `$.store..price` | The price of everything in the store |
95| `$..book[2]` | The third book |
96| `$..book[0,1]` | The first two books via subscript union |
97| `$..book[:2]` | The first two books via subscript array slice |
98| `$..*` | All members of JSON structure |
99
100## Streaming Complex Data
101You might also be interested in how to stream complex data such as the one above from memory.
102In that case `JSONStringifyStream` isn't too helpful, as it only supports JSON arrays (i.e. the root element is an array `[]`).
103
104For that case __JSON Stream__ provides the `jsonStringifyStream` method (TODO: better name to indicate that it is a readableStream? Change to ReadableStream subclass? Export `JSONStream` object with `stringify` method?) which accepts any JSON-ifiable data as argument. It is mostly compatible with `JSON.stringify` (TODO: replacer & spaces), but with the important exception that it "inlines" any `Promise`, `ReadableStream` and `AsyncIterable` it encounters. Again, an example:
105
106```js
107const stream = jsonStringifyStream({
108 type: Promise.resolve('foo'),
109 items: (async function* () {
110 yield { a: 1 }
111 yield { b: 2 }
112 yield { c: 3 }
113 // Can also have nested async values:
114 yield Promise.resolve({ zzz: 999 })
115 })(),
116})
117
118new Response(stream.pipeThrough(new TextEncoderStream()), {
119 headers: [['content-type', 'application/json']]
120})
121```
122
123Inspecting this on the network would show the following (where every newline is a chunk):
124```json
125{
126"type":
127"foo"
128,
129"items":
130[
131{
132"a":
1331
134}
135,
136{
137"b":
1382
139}
140,
141{
142"c":
1433
144}
145,
146{
147"zzz":
148999
149}
150]
151}
152```
153
154## Retrieving Complex Structures
155By providing a JSON Path to `JSONParseStream` we can stream the values of a single, nested array.
156For scenarios where the JSON structure is more complex, there is the `JSONParseNexus` (TODO: better name) class.
157It provides promise and and stream-based methods that accept JSON paths to retrieve one or multiple values respectively.
158While it is much more powerful and can restore arbitrary complex structures, it is also more difficult to use.
159
160It's best to explain by example. Assuming the data structure from above, we have:
161
162```js
163const parser = new JSONParseNexus();
164const data = {
165 type: parser.promise('$.type'),
166 items: parser.stream('$.items.*'),
167}
168(await fetch('/nested.json').body)
169 .pipeThrough(parser) // <-- new
170
171assertEquals(await data.type, 'foo')
172
173// We can collect the values as before:
174const collected = [];
175await data.items
176 .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
177```
178
179While this works just fine, it becomes more complicated when there are multiple streams and values involved.
180
181### Managing Internal Queues
182It's important to understand that `JSONParseNexus` provides mostly pull-based APIs.
183In the cause of `.stream()` and `.iterable()` no work is being done until a consumer requests a value by calling `.read()` or `.next()` respectively.
184However, once a value is requested, `JSONParseNexus` will parse values until the requested JSON path is found.
185Along the way it will fill up queues for any other requested JSON paths it encounters.
186This means that memory usage can grow arbitrarily large unless the data is processed in the order it was stringified:
187Take for example the following structure:
188
189```js
190const parser = new JSONParseNexus();
191
192jsonStringifyStream({
193 xs: new Array(10_000).fill({ x: 'x' }),
194 ys: new Array(10_000).fill({ y: 'y' }),
195}).pipeThrough(parser)
196
197for await (const y of parser.iterable('$.ys.*')) console.log(y)
198for await (const x of parser.iterable('$.xs.*')) console.log(x)
199```
200
201In this examples Ys are being processed before Xs, but were stringified in the opposite order.
202This means the internal queue of Xs grows to 10.000 before it is being processed by the second loop.
203This can be avoided by changing the order to match the stringification order.
204
205### Single Values and Lazy Promises
206Special attention has to be given single values, as Promises in JS are eager by default and have no concept of "pulling" data.
207`JSONParseNexus` introduces a lazy promise type that has a different behavior.
208As with async iterables and streams provided by `.iterable` and `.stream`, it does not pull values form the underlying readable until requested. This happens when `await`ing the promise, i.e. is calling the `.then` instance method, otherwise it stays idle.
209
210```js
211const parser = new JSONParseNexus();
212
213jsonStringifyStream({
214 type: 'items',
215 items: new Array(10_000).fill({ x: 'x' }),
216 trailer: 'trail',
217}).pipeThrough(parser)
218
219const data = {
220 type: await parser.promise('$.type') // ok
221 items: parser.iterable('$.items.*')
222 trailer: parser.promise('$.trailer') // do not await!
223}
224
225console.log(data.type) //=> 'items'
226
227// Now async iteration is in control of parser:
228for await (const x of data.items) {
229 console.log(x)
230}
231// Now we can await the trailer:
232console.log(await data.trailer)
233```
234
235In the above example, without lazy promises `ctrl.promise('$.trailer')` would immediately parse the entire JSON structure, which involves filling a queue of 10.000 elements.
236
237In order to transform value without triggering executions,
238the class provides a `.map` function that works similar to JS arrays:
239
240```js
241const trailer = ctrl.promise('$.trailer').map(x => x.toUpperCase())
242```
243
244## Limitations
245**JSON Stream** largely consists of old Node libraries that have been modified to work in Worker Runtimes and the browser.
246Currently they are not "integrated", for example specifying a specific JSON Path does not limit the amount of parsing the parser does.
247
248The stringification implementation, which is original, relies heavily on async generators, which are "slow" but they made the implementation easy and quick to implement.
249
250**JSON Stream** heavily relies on [`TransformStream`](https://developer.mozilla.org/en-US/docs/Web/API/TransformStream), which has only recently shipped in Chrome & Safari and is still behind a flag in Firefox. However, the latest version of Deno and Cloudflare Workers support it (might require compatibility flags in CF Workers).
251
252
253## Appendix
254### To ReadableStream Function
255An example above uses a `toReadableStream` function, which can be implemented as follows:
256```ts
257function toReadableStream<T>(iter: Iterable<T>) {
258 const xs = [...iter];
259 let x: T | undefined;
260 return new ReadableStream<T>({
261 pull(ctrl) {
262 if (x = xs.shift()) ctrl.enqueue(x); else ctrl.close();
263 },
264 });
265}
266```