UNPKG

11.6 kBJavaScriptView Raw
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3export class AvroParser {
4 /**
5 * Reads a fixed number of bytes from the stream.
6 *
7 * @param stream -
8 * @param length -
9 * @param options -
10 */
11 static async readFixedBytes(stream, length, options = {}) {
12 const bytes = await stream.read(length, { abortSignal: options.abortSignal });
13 if (bytes.length !== length) {
14 throw new Error("Hit stream end.");
15 }
16 return bytes;
17 }
18 /**
19 * Reads a single byte from the stream.
20 *
21 * @param stream -
22 * @param options -
23 */
24 static async readByte(stream, options = {}) {
25 const buf = await AvroParser.readFixedBytes(stream, 1, options);
26 return buf[0];
27 }
28 // int and long are stored in variable-length zig-zag coding.
29 // variable-length: https://lucene.apache.org/core/3_5_0/fileformats.html#VInt
30 // zig-zag: https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
31 static async readZigZagLong(stream, options = {}) {
32 let zigZagEncoded = 0;
33 let significanceInBit = 0;
34 let byte, haveMoreByte, significanceInFloat;
35 do {
36 byte = await AvroParser.readByte(stream, options);
37 haveMoreByte = byte & 0x80;
38 zigZagEncoded |= (byte & 0x7f) << significanceInBit;
39 significanceInBit += 7;
40 } while (haveMoreByte && significanceInBit < 28); // bitwise operation only works for 32-bit integers
41 if (haveMoreByte) {
42 // Switch to float arithmetic
43 // eslint-disable-next-line no-self-assign
44 zigZagEncoded = zigZagEncoded;
45 significanceInFloat = 268435456; // 2 ** 28.
46 do {
47 byte = await AvroParser.readByte(stream, options);
48 zigZagEncoded += (byte & 0x7f) * significanceInFloat;
49 significanceInFloat *= 128; // 2 ** 7
50 } while (byte & 0x80);
51 const res = (zigZagEncoded % 2 ? -(zigZagEncoded + 1) : zigZagEncoded) / 2;
52 if (res < Number.MIN_SAFE_INTEGER || res > Number.MAX_SAFE_INTEGER) {
53 throw new Error("Integer overflow.");
54 }
55 return res;
56 }
57 return (zigZagEncoded >> 1) ^ -(zigZagEncoded & 1);
58 }
59 static async readLong(stream, options = {}) {
60 return AvroParser.readZigZagLong(stream, options);
61 }
62 static async readInt(stream, options = {}) {
63 return AvroParser.readZigZagLong(stream, options);
64 }
65 static async readNull() {
66 return null;
67 }
68 static async readBoolean(stream, options = {}) {
69 const b = await AvroParser.readByte(stream, options);
70 if (b === 1) {
71 return true;
72 }
73 else if (b === 0) {
74 return false;
75 }
76 else {
77 throw new Error("Byte was not a boolean.");
78 }
79 }
80 static async readFloat(stream, options = {}) {
81 const u8arr = await AvroParser.readFixedBytes(stream, 4, options);
82 const view = new DataView(u8arr.buffer, u8arr.byteOffset, u8arr.byteLength);
83 return view.getFloat32(0, true); // littleEndian = true
84 }
85 static async readDouble(stream, options = {}) {
86 const u8arr = await AvroParser.readFixedBytes(stream, 8, options);
87 const view = new DataView(u8arr.buffer, u8arr.byteOffset, u8arr.byteLength);
88 return view.getFloat64(0, true); // littleEndian = true
89 }
90 static async readBytes(stream, options = {}) {
91 const size = await AvroParser.readLong(stream, options);
92 if (size < 0) {
93 throw new Error("Bytes size was negative.");
94 }
95 return stream.read(size, { abortSignal: options.abortSignal });
96 }
97 static async readString(stream, options = {}) {
98 const u8arr = await AvroParser.readBytes(stream, options);
99 const utf8decoder = new TextDecoder();
100 return utf8decoder.decode(u8arr);
101 }
102 static async readMapPair(stream, readItemMethod, options = {}) {
103 const key = await AvroParser.readString(stream, options);
104 // FUTURE: this won't work with readFixed (currently not supported) which needs a length as the parameter.
105 const value = await readItemMethod(stream, options);
106 return { key, value };
107 }
108 static async readMap(stream, readItemMethod, options = {}) {
109 const readPairMethod = (s, opts = {}) => {
110 return AvroParser.readMapPair(s, readItemMethod, opts);
111 };
112 const pairs = await AvroParser.readArray(stream, readPairMethod, options);
113 const dict = {};
114 for (const pair of pairs) {
115 dict[pair.key] = pair.value;
116 }
117 return dict;
118 }
119 static async readArray(stream, readItemMethod, options = {}) {
120 const items = [];
121 for (let count = await AvroParser.readLong(stream, options); count !== 0; count = await AvroParser.readLong(stream, options)) {
122 if (count < 0) {
123 // Ignore block sizes
124 await AvroParser.readLong(stream, options);
125 count = -count;
126 }
127 while (count--) {
128 const item = await readItemMethod(stream, options);
129 items.push(item);
130 }
131 }
132 return items;
133 }
134}
135var AvroComplex;
136(function (AvroComplex) {
137 AvroComplex["RECORD"] = "record";
138 AvroComplex["ENUM"] = "enum";
139 AvroComplex["ARRAY"] = "array";
140 AvroComplex["MAP"] = "map";
141 AvroComplex["UNION"] = "union";
142 AvroComplex["FIXED"] = "fixed";
143})(AvroComplex || (AvroComplex = {}));
144var AvroPrimitive;
145(function (AvroPrimitive) {
146 AvroPrimitive["NULL"] = "null";
147 AvroPrimitive["BOOLEAN"] = "boolean";
148 AvroPrimitive["INT"] = "int";
149 AvroPrimitive["LONG"] = "long";
150 AvroPrimitive["FLOAT"] = "float";
151 AvroPrimitive["DOUBLE"] = "double";
152 AvroPrimitive["BYTES"] = "bytes";
153 AvroPrimitive["STRING"] = "string";
154})(AvroPrimitive || (AvroPrimitive = {}));
155export class AvroType {
156 /**
157 * Determines the AvroType from the Avro Schema.
158 */
159 static fromSchema(schema) {
160 if (typeof schema === "string") {
161 return AvroType.fromStringSchema(schema);
162 }
163 else if (Array.isArray(schema)) {
164 return AvroType.fromArraySchema(schema);
165 }
166 else {
167 return AvroType.fromObjectSchema(schema);
168 }
169 }
170 static fromStringSchema(schema) {
171 switch (schema) {
172 case AvroPrimitive.NULL:
173 case AvroPrimitive.BOOLEAN:
174 case AvroPrimitive.INT:
175 case AvroPrimitive.LONG:
176 case AvroPrimitive.FLOAT:
177 case AvroPrimitive.DOUBLE:
178 case AvroPrimitive.BYTES:
179 case AvroPrimitive.STRING:
180 return new AvroPrimitiveType(schema);
181 default:
182 throw new Error(`Unexpected Avro type ${schema}`);
183 }
184 }
185 static fromArraySchema(schema) {
186 return new AvroUnionType(schema.map(AvroType.fromSchema));
187 }
188 static fromObjectSchema(schema) {
189 const type = schema.type;
190 // Primitives can be defined as strings or objects
191 try {
192 return AvroType.fromStringSchema(type);
193 }
194 catch (err) {
195 // eslint-disable-line no-empty
196 }
197 switch (type) {
198 case AvroComplex.RECORD:
199 if (schema.aliases) {
200 throw new Error(`aliases currently is not supported, schema: ${schema}`);
201 }
202 if (!schema.name) {
203 throw new Error(`Required attribute 'name' doesn't exist on schema: ${schema}`);
204 }
205 // eslint-disable-next-line no-case-declarations
206 const fields = {};
207 if (!schema.fields) {
208 throw new Error(`Required attribute 'fields' doesn't exist on schema: ${schema}`);
209 }
210 for (const field of schema.fields) {
211 fields[field.name] = AvroType.fromSchema(field.type);
212 }
213 return new AvroRecordType(fields, schema.name);
214 case AvroComplex.ENUM:
215 if (schema.aliases) {
216 throw new Error(`aliases currently is not supported, schema: ${schema}`);
217 }
218 if (!schema.symbols) {
219 throw new Error(`Required attribute 'symbols' doesn't exist on schema: ${schema}`);
220 }
221 return new AvroEnumType(schema.symbols);
222 case AvroComplex.MAP:
223 if (!schema.values) {
224 throw new Error(`Required attribute 'values' doesn't exist on schema: ${schema}`);
225 }
226 return new AvroMapType(AvroType.fromSchema(schema.values));
227 case AvroComplex.ARRAY: // Unused today
228 case AvroComplex.FIXED: // Unused today
229 default:
230 throw new Error(`Unexpected Avro type ${type} in ${schema}`);
231 }
232 }
233}
234class AvroPrimitiveType extends AvroType {
235 constructor(primitive) {
236 super();
237 this._primitive = primitive;
238 }
239 read(stream, options = {}) {
240 switch (this._primitive) {
241 case AvroPrimitive.NULL:
242 return AvroParser.readNull();
243 case AvroPrimitive.BOOLEAN:
244 return AvroParser.readBoolean(stream, options);
245 case AvroPrimitive.INT:
246 return AvroParser.readInt(stream, options);
247 case AvroPrimitive.LONG:
248 return AvroParser.readLong(stream, options);
249 case AvroPrimitive.FLOAT:
250 return AvroParser.readFloat(stream, options);
251 case AvroPrimitive.DOUBLE:
252 return AvroParser.readDouble(stream, options);
253 case AvroPrimitive.BYTES:
254 return AvroParser.readBytes(stream, options);
255 case AvroPrimitive.STRING:
256 return AvroParser.readString(stream, options);
257 default:
258 throw new Error("Unknown Avro Primitive");
259 }
260 }
261}
262class AvroEnumType extends AvroType {
263 constructor(symbols) {
264 super();
265 this._symbols = symbols;
266 }
267 async read(stream, options = {}) {
268 const value = await AvroParser.readInt(stream, options);
269 return this._symbols[value];
270 }
271}
272class AvroUnionType extends AvroType {
273 constructor(types) {
274 super();
275 this._types = types;
276 }
277 async read(stream, options = {}) {
278 const typeIndex = await AvroParser.readInt(stream, options);
279 return this._types[typeIndex].read(stream, options);
280 }
281}
282class AvroMapType extends AvroType {
283 constructor(itemType) {
284 super();
285 this._itemType = itemType;
286 }
287 read(stream, options = {}) {
288 const readItemMethod = (s, opts) => {
289 return this._itemType.read(s, opts);
290 };
291 return AvroParser.readMap(stream, readItemMethod, options);
292 }
293}
294class AvroRecordType extends AvroType {
295 constructor(fields, name) {
296 super();
297 this._fields = fields;
298 this._name = name;
299 }
300 async read(stream, options = {}) {
301 const record = {};
302 record["$schema"] = this._name;
303 for (const key in this._fields) {
304 if (Object.prototype.hasOwnProperty.call(this._fields, key)) {
305 record[key] = await this._fields[key].read(stream, options);
306 }
307 }
308 return record;
309 }
310}
311//# sourceMappingURL=AvroParser.js.map
\No newline at end of file