1 |
|
2 |
|
3 | export class AvroParser {
|
4 | |
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | static async readFixedBytes(stream, length, options = {}) {
|
12 | const bytes = await stream.read(length, { abortSignal: options.abortSignal });
|
13 | if (bytes.length !== length) {
|
14 | throw new Error("Hit stream end.");
|
15 | }
|
16 | return bytes;
|
17 | }
|
18 | |
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 | static async readByte(stream, options = {}) {
|
25 | const buf = await AvroParser.readFixedBytes(stream, 1, options);
|
26 | return buf[0];
|
27 | }
|
28 |
|
29 |
|
30 |
|
31 | static async readZigZagLong(stream, options = {}) {
|
32 | let zigZagEncoded = 0;
|
33 | let significanceInBit = 0;
|
34 | let byte, haveMoreByte, significanceInFloat;
|
35 | do {
|
36 | byte = await AvroParser.readByte(stream, options);
|
37 | haveMoreByte = byte & 0x80;
|
38 | zigZagEncoded |= (byte & 0x7f) << significanceInBit;
|
39 | significanceInBit += 7;
|
40 | } while (haveMoreByte && significanceInBit < 28);
|
41 | if (haveMoreByte) {
|
42 |
|
43 |
|
44 | zigZagEncoded = zigZagEncoded;
|
45 | significanceInFloat = 268435456;
|
46 | do {
|
47 | byte = await AvroParser.readByte(stream, options);
|
48 | zigZagEncoded += (byte & 0x7f) * significanceInFloat;
|
49 | significanceInFloat *= 128;
|
50 | } while (byte & 0x80);
|
51 | const res = (zigZagEncoded % 2 ? -(zigZagEncoded + 1) : zigZagEncoded) / 2;
|
52 | if (res < Number.MIN_SAFE_INTEGER || res > Number.MAX_SAFE_INTEGER) {
|
53 | throw new Error("Integer overflow.");
|
54 | }
|
55 | return res;
|
56 | }
|
57 | return (zigZagEncoded >> 1) ^ -(zigZagEncoded & 1);
|
58 | }
|
59 | static async readLong(stream, options = {}) {
|
60 | return AvroParser.readZigZagLong(stream, options);
|
61 | }
|
62 | static async readInt(stream, options = {}) {
|
63 | return AvroParser.readZigZagLong(stream, options);
|
64 | }
|
65 | static async readNull() {
|
66 | return null;
|
67 | }
|
68 | static async readBoolean(stream, options = {}) {
|
69 | const b = await AvroParser.readByte(stream, options);
|
70 | if (b === 1) {
|
71 | return true;
|
72 | }
|
73 | else if (b === 0) {
|
74 | return false;
|
75 | }
|
76 | else {
|
77 | throw new Error("Byte was not a boolean.");
|
78 | }
|
79 | }
|
80 | static async readFloat(stream, options = {}) {
|
81 | const u8arr = await AvroParser.readFixedBytes(stream, 4, options);
|
82 | const view = new DataView(u8arr.buffer, u8arr.byteOffset, u8arr.byteLength);
|
83 | return view.getFloat32(0, true);
|
84 | }
|
85 | static async readDouble(stream, options = {}) {
|
86 | const u8arr = await AvroParser.readFixedBytes(stream, 8, options);
|
87 | const view = new DataView(u8arr.buffer, u8arr.byteOffset, u8arr.byteLength);
|
88 | return view.getFloat64(0, true);
|
89 | }
|
90 | static async readBytes(stream, options = {}) {
|
91 | const size = await AvroParser.readLong(stream, options);
|
92 | if (size < 0) {
|
93 | throw new Error("Bytes size was negative.");
|
94 | }
|
95 | return stream.read(size, { abortSignal: options.abortSignal });
|
96 | }
|
97 | static async readString(stream, options = {}) {
|
98 | const u8arr = await AvroParser.readBytes(stream, options);
|
99 | const utf8decoder = new TextDecoder();
|
100 | return utf8decoder.decode(u8arr);
|
101 | }
|
102 | static async readMapPair(stream, readItemMethod, options = {}) {
|
103 | const key = await AvroParser.readString(stream, options);
|
104 |
|
105 | const value = await readItemMethod(stream, options);
|
106 | return { key, value };
|
107 | }
|
108 | static async readMap(stream, readItemMethod, options = {}) {
|
109 | const readPairMethod = (s, opts = {}) => {
|
110 | return AvroParser.readMapPair(s, readItemMethod, opts);
|
111 | };
|
112 | const pairs = await AvroParser.readArray(stream, readPairMethod, options);
|
113 | const dict = {};
|
114 | for (const pair of pairs) {
|
115 | dict[pair.key] = pair.value;
|
116 | }
|
117 | return dict;
|
118 | }
|
119 | static async readArray(stream, readItemMethod, options = {}) {
|
120 | const items = [];
|
121 | for (let count = await AvroParser.readLong(stream, options); count !== 0; count = await AvroParser.readLong(stream, options)) {
|
122 | if (count < 0) {
|
123 |
|
124 | await AvroParser.readLong(stream, options);
|
125 | count = -count;
|
126 | }
|
127 | while (count--) {
|
128 | const item = await readItemMethod(stream, options);
|
129 | items.push(item);
|
130 | }
|
131 | }
|
132 | return items;
|
133 | }
|
134 | }
|
135 | var AvroComplex;
|
136 | (function (AvroComplex) {
|
137 | AvroComplex["RECORD"] = "record";
|
138 | AvroComplex["ENUM"] = "enum";
|
139 | AvroComplex["ARRAY"] = "array";
|
140 | AvroComplex["MAP"] = "map";
|
141 | AvroComplex["UNION"] = "union";
|
142 | AvroComplex["FIXED"] = "fixed";
|
143 | })(AvroComplex || (AvroComplex = {}));
|
144 | var AvroPrimitive;
|
145 | (function (AvroPrimitive) {
|
146 | AvroPrimitive["NULL"] = "null";
|
147 | AvroPrimitive["BOOLEAN"] = "boolean";
|
148 | AvroPrimitive["INT"] = "int";
|
149 | AvroPrimitive["LONG"] = "long";
|
150 | AvroPrimitive["FLOAT"] = "float";
|
151 | AvroPrimitive["DOUBLE"] = "double";
|
152 | AvroPrimitive["BYTES"] = "bytes";
|
153 | AvroPrimitive["STRING"] = "string";
|
154 | })(AvroPrimitive || (AvroPrimitive = {}));
|
155 | export class AvroType {
|
156 | |
157 |
|
158 |
|
159 | static fromSchema(schema) {
|
160 | if (typeof schema === "string") {
|
161 | return AvroType.fromStringSchema(schema);
|
162 | }
|
163 | else if (Array.isArray(schema)) {
|
164 | return AvroType.fromArraySchema(schema);
|
165 | }
|
166 | else {
|
167 | return AvroType.fromObjectSchema(schema);
|
168 | }
|
169 | }
|
170 | static fromStringSchema(schema) {
|
171 | switch (schema) {
|
172 | case AvroPrimitive.NULL:
|
173 | case AvroPrimitive.BOOLEAN:
|
174 | case AvroPrimitive.INT:
|
175 | case AvroPrimitive.LONG:
|
176 | case AvroPrimitive.FLOAT:
|
177 | case AvroPrimitive.DOUBLE:
|
178 | case AvroPrimitive.BYTES:
|
179 | case AvroPrimitive.STRING:
|
180 | return new AvroPrimitiveType(schema);
|
181 | default:
|
182 | throw new Error(`Unexpected Avro type ${schema}`);
|
183 | }
|
184 | }
|
185 | static fromArraySchema(schema) {
|
186 | return new AvroUnionType(schema.map(AvroType.fromSchema));
|
187 | }
|
188 | static fromObjectSchema(schema) {
|
189 | const type = schema.type;
|
190 |
|
191 | try {
|
192 | return AvroType.fromStringSchema(type);
|
193 | }
|
194 | catch (err) {
|
195 |
|
196 | }
|
197 | switch (type) {
|
198 | case AvroComplex.RECORD:
|
199 | if (schema.aliases) {
|
200 | throw new Error(`aliases currently is not supported, schema: ${schema}`);
|
201 | }
|
202 | if (!schema.name) {
|
203 | throw new Error(`Required attribute 'name' doesn't exist on schema: ${schema}`);
|
204 | }
|
205 |
|
206 | const fields = {};
|
207 | if (!schema.fields) {
|
208 | throw new Error(`Required attribute 'fields' doesn't exist on schema: ${schema}`);
|
209 | }
|
210 | for (const field of schema.fields) {
|
211 | fields[field.name] = AvroType.fromSchema(field.type);
|
212 | }
|
213 | return new AvroRecordType(fields, schema.name);
|
214 | case AvroComplex.ENUM:
|
215 | if (schema.aliases) {
|
216 | throw new Error(`aliases currently is not supported, schema: ${schema}`);
|
217 | }
|
218 | if (!schema.symbols) {
|
219 | throw new Error(`Required attribute 'symbols' doesn't exist on schema: ${schema}`);
|
220 | }
|
221 | return new AvroEnumType(schema.symbols);
|
222 | case AvroComplex.MAP:
|
223 | if (!schema.values) {
|
224 | throw new Error(`Required attribute 'values' doesn't exist on schema: ${schema}`);
|
225 | }
|
226 | return new AvroMapType(AvroType.fromSchema(schema.values));
|
227 | case AvroComplex.ARRAY:
|
228 | case AvroComplex.FIXED:
|
229 | default:
|
230 | throw new Error(`Unexpected Avro type ${type} in ${schema}`);
|
231 | }
|
232 | }
|
233 | }
|
234 | class AvroPrimitiveType extends AvroType {
|
235 | constructor(primitive) {
|
236 | super();
|
237 | this._primitive = primitive;
|
238 | }
|
239 | read(stream, options = {}) {
|
240 | switch (this._primitive) {
|
241 | case AvroPrimitive.NULL:
|
242 | return AvroParser.readNull();
|
243 | case AvroPrimitive.BOOLEAN:
|
244 | return AvroParser.readBoolean(stream, options);
|
245 | case AvroPrimitive.INT:
|
246 | return AvroParser.readInt(stream, options);
|
247 | case AvroPrimitive.LONG:
|
248 | return AvroParser.readLong(stream, options);
|
249 | case AvroPrimitive.FLOAT:
|
250 | return AvroParser.readFloat(stream, options);
|
251 | case AvroPrimitive.DOUBLE:
|
252 | return AvroParser.readDouble(stream, options);
|
253 | case AvroPrimitive.BYTES:
|
254 | return AvroParser.readBytes(stream, options);
|
255 | case AvroPrimitive.STRING:
|
256 | return AvroParser.readString(stream, options);
|
257 | default:
|
258 | throw new Error("Unknown Avro Primitive");
|
259 | }
|
260 | }
|
261 | }
|
262 | class AvroEnumType extends AvroType {
|
263 | constructor(symbols) {
|
264 | super();
|
265 | this._symbols = symbols;
|
266 | }
|
267 | async read(stream, options = {}) {
|
268 | const value = await AvroParser.readInt(stream, options);
|
269 | return this._symbols[value];
|
270 | }
|
271 | }
|
272 | class AvroUnionType extends AvroType {
|
273 | constructor(types) {
|
274 | super();
|
275 | this._types = types;
|
276 | }
|
277 | async read(stream, options = {}) {
|
278 | const typeIndex = await AvroParser.readInt(stream, options);
|
279 | return this._types[typeIndex].read(stream, options);
|
280 | }
|
281 | }
|
282 | class AvroMapType extends AvroType {
|
283 | constructor(itemType) {
|
284 | super();
|
285 | this._itemType = itemType;
|
286 | }
|
287 | read(stream, options = {}) {
|
288 | const readItemMethod = (s, opts) => {
|
289 | return this._itemType.read(s, opts);
|
290 | };
|
291 | return AvroParser.readMap(stream, readItemMethod, options);
|
292 | }
|
293 | }
|
294 | class AvroRecordType extends AvroType {
|
295 | constructor(fields, name) {
|
296 | super();
|
297 | this._fields = fields;
|
298 | this._name = name;
|
299 | }
|
300 | async read(stream, options = {}) {
|
301 | const record = {};
|
302 | record["$schema"] = this._name;
|
303 | for (const key in this._fields) {
|
304 | if (Object.prototype.hasOwnProperty.call(this._fields, key)) {
|
305 | record[key] = await this._fields[key].read(stream, options);
|
306 | }
|
307 | }
|
308 | return record;
|
309 | }
|
310 | }
|
311 |
|
\ | No newline at end of file |