1 | # JSON Stream
|
2 |
|
3 | Utilities for working with streaming JSON in Worker Runtimes such as Cloudflare Workers, Deno Deploy and Service Workers.
|
4 |
|
5 | ***
|
6 |
|
7 | __Work in Progress__: See TODOs & Deprecations
|
8 |
|
9 | ***
|
10 |
|
11 | ## Base Case
|
12 | The most basic use case is turning a stream of objects into stream of strings that can be sent over the wire.
|
13 | On the other end, it can be turned back into a stream of JSON objects.
|
14 | For this `JSONStringifyStream` and `JSONParseStream` are all that is required.
|
15 | They work practically the same as `TextEncoderStream` and `TextDecoderStream`:
|
16 |
|
17 | ```js
|
18 | const items = [
|
19 | { a: 1 },
|
20 | { b: 2},
|
21 | { c: 3 },
|
22 | 'foo',
|
23 | { a: { nested: { object: true }} }
|
24 | ];
|
25 | const stream = toReadableStream(items)
|
26 | .pipeThrough(new JSONStringifyStream())
|
27 | .pipeThrough(new TextEncoderStream())
|
28 |
|
29 | // Usage e.g.:
|
30 | await fetch('/endpoint.json', {
|
31 | body: stream,
|
32 | method: 'POST',
|
33 | headers: [['content-type', 'application/json']]
|
34 | })
|
35 |
|
36 | // On the server side:
|
37 | const collected = [];
|
38 | await stream
|
39 | .pipeThrough(new JSONParseStream())
|
40 | .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
|
41 |
|
42 | assertEquals(items, collected)
|
43 | ```
|
44 |
|
45 | Note that standard JSON is used as the transport format. Unlike ND-JSON,
|
46 | neither side needs to opt-in using the streaming parser/stringifier to accept data.
|
47 | For example this is just as valid:
|
48 |
|
49 | ```js
|
50 | const collected = await new Response(stream).json()
|
51 | ```
|
52 |
|
53 | ~~If on the other hand ND-JSON is sufficient for your use case, this module also provides `NDJSONStringifyStream` and `NDJSONParseStream` that work the same way as shown above, but lack the following features.~~ (TODO: make separate module?)
|
54 |
|
55 | ## Using JSON Path to locate nested data
|
56 | __JSON Stream__ also supports more complex use cases. Assume JSON of the following structure:
|
57 |
|
58 | ```jsonc
|
59 | // filename: "nested.json"
|
60 | {
|
61 | "type": "foo",
|
62 | "items": [
|
63 | { "a": 1 },
|
64 | { "b": 2 },
|
65 | { "c": 3 },
|
66 | // ...
|
67 | { "zzz": 999 },
|
68 | ]
|
69 | }
|
70 | ```
|
71 |
|
72 | Here, the example code from above wouldn't work (or at least not as you would expect),
|
73 | because by default `JSONParseStream` emits the objects that are the immediate children of the root object.
|
74 | However, the constructor accepts a JSONPath-like string to locate the desired data to parse:
|
75 |
|
76 | ```js
|
77 | const collected = [];
|
78 | await (await fetch('/nested.json')).body
|
79 | .pipeThrough(new JSONParseStream('$.items.*')) // <-- new
|
80 | .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
|
81 | ```
|
82 |
|
83 | It's important to add the `.*` at the end, otherwise the entire items array will arrive in a singe call once it is fully parsed.
|
84 |
|
85 | `JSONParseStream` only supports a subset of JSONPath, specifically eval (`@`) expressions and negative slices are omitted.
|
86 | Below is a table showing some examples:
|
87 |
|
88 | | JSONPath | Description |
|
89 | |:--------------------------|:------------------------------------------------------------|
|
90 | | `$.*` | All direct children of the root. Default. |
|
91 | | `$.store.book[*].author` | The authors of all books in the store |
|
92 | | `$..author` | All authors |
|
93 | | `$.store.*` | All things in store, which are some books and a red bicycle |
|
94 | | `$.store..price` | The price of everything in the store |
|
95 | | `$..book[2]` | The third book |
|
96 | | `$..book[0,1]` | The first two books via subscript union |
|
97 | | `$..book[:2]` | The first two books via subscript array slice |
|
98 | | `$..*` | All members of JSON structure |
|
99 |
|
100 | ## Streaming Complex Data
|
101 | You might also be interested in how to stream complex data such as the one above from memory.
|
102 | In that case `JSONStringifyStream` isn't too helpful, as it only supports JSON arrays (i.e. the root element is an array `[]`).
|
103 |
|
104 | For that case __JSON Stream__ provides the `jsonStringifyStream` method (TODO: better name to indicate that it is a readableStream? Change to ReadableStream subclass? Export `JSONStream` object with `stringify` method?) which accepts any JSON-ifiable data as argument. It is mostly compatible with `JSON.stringify` (TODO: replacer & spaces), but with the important exception that it "inlines" any `Promise`, `ReadableStream` and `AsyncIterable` it encounters. Again, an example:
|
105 |
|
106 | ```js
|
107 | const stream = jsonStringifyStream({
|
108 | type: Promise.resolve('foo'),
|
109 | items: (async function* () {
|
110 | yield { a: 1 }
|
111 | yield { b: 2 }
|
112 | yield { c: 3 }
|
113 | // Can also have nested async values:
|
114 | yield Promise.resolve({ zzz: 999 })
|
115 | })(),
|
116 | })
|
117 |
|
118 | new Response(stream.pipeThrough(new TextEncoderStream()), {
|
119 | headers: [['content-type', 'application/json']]
|
120 | })
|
121 | ```
|
122 |
|
123 | Inspecting this on the network would show the following (where every newline is a chunk):
|
124 | ```json
|
125 | {
|
126 | "type":
|
127 | "foo"
|
128 | ,
|
129 | "items":
|
130 | [
|
131 | {
|
132 | "a":
|
133 | 1
|
134 | }
|
135 | ,
|
136 | {
|
137 | "b":
|
138 | 2
|
139 | }
|
140 | ,
|
141 | {
|
142 | "c":
|
143 | 3
|
144 | }
|
145 | ,
|
146 | {
|
147 | "zzz":
|
148 | 999
|
149 | }
|
150 | ]
|
151 | }
|
152 | ```
|
153 |
|
154 | ## Retrieving Complex Structures
|
155 | By providing a JSON Path to `JSONParseStream` we can stream the values of a single, nested array.
|
156 | For scenarios where the JSON structure is more complex, there is the `JSONParseNexus` (TODO: better name) class.
|
157 | It provides promise and and stream-based methods that accept JSON paths to retrieve one or multiple values respectively.
|
158 | While it is much more powerful and can restore arbitrary complex structures, it is also more difficult to use.
|
159 |
|
160 | It's best to explain by example. Assuming the data structure from above, we have:
|
161 |
|
162 | ```js
|
163 | const parser = new JSONParseNexus();
|
164 | const data = {
|
165 | type: parser.promise('$.type'),
|
166 | items: parser.stream('$.items.*'),
|
167 | }
|
168 | (await fetch('/nested.json').body)
|
169 | .pipeThrough(parser) // <-- new
|
170 |
|
171 | assertEquals(await data.type, 'foo')
|
172 |
|
173 | // We can collect the values as before:
|
174 | const collected = [];
|
175 | await data.items
|
176 | .pipeTo(new WritableStream({ write(obj) { collected.push(obj) }}))
|
177 | ```
|
178 |
|
179 | While this works just fine, it becomes more complicated when there are multiple streams and values involved.
|
180 |
|
181 | ### Managing Internal Queues
|
182 | It's important to understand that `JSONParseNexus` provides mostly pull-based APIs.
|
183 | In the cause of `.stream()` and `.iterable()` no work is being done until a consumer requests a value by calling `.read()` or `.next()` respectively.
|
184 | However, once a value is requested, `JSONParseNexus` will parse values until the requested JSON path is found.
|
185 | Along the way it will fill up queues for any other requested JSON paths it encounters.
|
186 | This means that memory usage can grow arbitrarily large unless the data is processed in the order it was stringified:
|
187 | Take for example the following structure:
|
188 |
|
189 | ```js
|
190 | const parser = new JSONParseNexus();
|
191 |
|
192 | jsonStringifyStream({
|
193 | xs: new Array(10_000).fill({ x: 'x' }),
|
194 | ys: new Array(10_000).fill({ y: 'y' }),
|
195 | }).pipeThrough(parser)
|
196 |
|
197 | for await (const y of parser.iterable('$.ys.*')) console.log(y)
|
198 | for await (const x of parser.iterable('$.xs.*')) console.log(x)
|
199 | ```
|
200 |
|
201 | In this examples Ys are being processed before Xs, but were stringified in the opposite order.
|
202 | This means the internal queue of Xs grows to 10.000 before it is being processed by the second loop.
|
203 | This can be avoided by changing the order to match the stringification order.
|
204 |
|
205 | ### Single Values and Lazy Promises
|
206 | Special attention has to be given single values, as Promises in JS are eager by default and have no concept of "pulling" data.
|
207 | `JSONParseNexus` introduces a lazy promise type that has a different behavior.
|
208 | As with async iterables and streams provided by `.iterable` and `.stream`, it does not pull values form the underlying readable until requested. This happens when `await`ing the promise, i.e. is calling the `.then` instance method, otherwise it stays idle.
|
209 |
|
210 | ```js
|
211 | const parser = new JSONParseNexus();
|
212 |
|
213 | jsonStringifyStream({
|
214 | type: 'items',
|
215 | items: new Array(10_000).fill({ x: 'x' }),
|
216 | trailer: 'trail',
|
217 | }).pipeThrough(parser)
|
218 |
|
219 | const data = {
|
220 | type: await parser.promise('$.type') // ok
|
221 | items: parser.iterable('$.items.*')
|
222 | trailer: parser.promise('$.trailer') // do not await!
|
223 | }
|
224 |
|
225 | console.log(data.type) //=> 'items'
|
226 |
|
227 | // Now async iteration is in control of parser:
|
228 | for await (const x of data.items) {
|
229 | console.log(x)
|
230 | }
|
231 | // Now we can await the trailer:
|
232 | console.log(await data.trailer)
|
233 | ```
|
234 |
|
235 | In the above example, without lazy promises `ctrl.promise('$.trailer')` would immediately parse the entire JSON structure, which involves filling a queue of 10.000 elements.
|
236 |
|
237 | In order to transform value without triggering executions,
|
238 | the class provides a `.map` function that works similar to JS arrays:
|
239 |
|
240 | ```js
|
241 | const trailer = ctrl.promise('$.trailer').map(x => x.toUpperCase())
|
242 | ```
|
243 |
|
244 | ## Limitations
|
245 | **JSON Stream** largely consists of old Node libraries that have been modified to work in Worker Runtimes and the browser.
|
246 | Currently they are not "integrated", for example specifying a specific JSON Path does not limit the amount of parsing the parser does.
|
247 |
|
248 | The stringification implementation, which is original, relies heavily on async generators, which are "slow" but they made the implementation easy and quick to implement.
|
249 |
|
250 | **JSON Stream** heavily relies on [`TransformStream`](https://developer.mozilla.org/en-US/docs/Web/API/TransformStream), which has only recently shipped in Chrome & Safari and is still behind a flag in Firefox. However, the latest version of Deno and Cloudflare Workers support it (might require compatibility flags in CF Workers).
|
251 |
|
252 |
|
253 | ## Appendix
|
254 | ### To ReadableStream Function
|
255 | An example above uses a `toReadableStream` function, which can be implemented as follows:
|
256 | ```ts
|
257 | function toReadableStream<T>(iter: Iterable<T>) {
|
258 | const xs = [...iter];
|
259 | let x: T | undefined;
|
260 | return new ReadableStream<T>({
|
261 | pull(ctrl) {
|
262 | if (x = xs.shift()) ctrl.enqueue(x); else ctrl.close();
|
263 | },
|
264 | });
|
265 | }
|
266 | ```
|