UNPKG

4.9 kBJavaScriptView Raw
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3import { __asyncGenerator, __await } from "tslib";
4// TODO: Do a review of non-interfaces
5/* eslint-disable @azure/azure-sdk/ts-use-interface-parameters */
6import "@azure/core-paging";
7import { AVRO_CODEC_KEY, AVRO_INIT_BYTES, AVRO_SCHEMA_KEY, AVRO_SYNC_MARKER_SIZE, } from "./AvroConstants";
8import { AvroParser, AvroType } from "./AvroParser";
9import { arraysEqual } from "./utils/utils.common";
10export class AvroReader {
11 constructor(dataStream, headerStream, currentBlockOffset, indexWithinCurrentBlock) {
12 this._dataStream = dataStream;
13 this._headerStream = headerStream || dataStream;
14 this._initialized = false;
15 this._blockOffset = currentBlockOffset || 0;
16 this._objectIndex = indexWithinCurrentBlock || 0;
17 this._initialBlockOffset = currentBlockOffset || 0;
18 }
19 get blockOffset() {
20 return this._blockOffset;
21 }
22 get objectIndex() {
23 return this._objectIndex;
24 }
25 async initialize(options = {}) {
26 const header = await AvroParser.readFixedBytes(this._headerStream, AVRO_INIT_BYTES.length, {
27 abortSignal: options.abortSignal,
28 });
29 if (!arraysEqual(header, AVRO_INIT_BYTES)) {
30 throw new Error("Stream is not an Avro file.");
31 }
32 // File metadata is written as if defined by the following map schema:
33 // { "type": "map", "values": "bytes"}
34 this._metadata = await AvroParser.readMap(this._headerStream, AvroParser.readString, {
35 abortSignal: options.abortSignal,
36 });
37 // Validate codec
38 const codec = this._metadata[AVRO_CODEC_KEY];
39 if (!(codec === undefined || codec === null || codec === "null")) {
40 throw new Error("Codecs are not supported");
41 }
42 // The 16-byte, randomly-generated sync marker for this file.
43 this._syncMarker = await AvroParser.readFixedBytes(this._headerStream, AVRO_SYNC_MARKER_SIZE, {
44 abortSignal: options.abortSignal,
45 });
46 // Parse the schema
47 const schema = JSON.parse(this._metadata[AVRO_SCHEMA_KEY]);
48 this._itemType = AvroType.fromSchema(schema);
49 if (this._blockOffset === 0) {
50 this._blockOffset = this._initialBlockOffset + this._dataStream.position;
51 }
52 this._itemsRemainingInBlock = await AvroParser.readLong(this._dataStream, {
53 abortSignal: options.abortSignal,
54 });
55 // skip block length
56 await AvroParser.readLong(this._dataStream, { abortSignal: options.abortSignal });
57 this._initialized = true;
58 if (this._objectIndex && this._objectIndex > 0) {
59 for (let i = 0; i < this._objectIndex; i++) {
60 await this._itemType.read(this._dataStream, { abortSignal: options.abortSignal });
61 this._itemsRemainingInBlock--;
62 }
63 }
64 }
65 hasNext() {
66 return !this._initialized || this._itemsRemainingInBlock > 0;
67 }
68 parseObjects(options = {}) {
69 return __asyncGenerator(this, arguments, function* parseObjects_1() {
70 if (!this._initialized) {
71 yield __await(this.initialize(options));
72 }
73 while (this.hasNext()) {
74 const result = yield __await(this._itemType.read(this._dataStream, {
75 abortSignal: options.abortSignal,
76 }));
77 this._itemsRemainingInBlock--;
78 this._objectIndex++;
79 if (this._itemsRemainingInBlock === 0) {
80 const marker = yield __await(AvroParser.readFixedBytes(this._dataStream, AVRO_SYNC_MARKER_SIZE, {
81 abortSignal: options.abortSignal,
82 }));
83 this._blockOffset = this._initialBlockOffset + this._dataStream.position;
84 this._objectIndex = 0;
85 if (!arraysEqual(this._syncMarker, marker)) {
86 throw new Error("Stream is not a valid Avro file.");
87 }
88 try {
89 this._itemsRemainingInBlock = yield __await(AvroParser.readLong(this._dataStream, {
90 abortSignal: options.abortSignal,
91 }));
92 }
93 catch (err) {
94 // We hit the end of the stream.
95 this._itemsRemainingInBlock = 0;
96 }
97 if (this._itemsRemainingInBlock > 0) {
98 // Ignore block size
99 yield __await(AvroParser.readLong(this._dataStream, { abortSignal: options.abortSignal }));
100 }
101 }
102 yield yield __await(result);
103 }
104 });
105 }
106}
107//# sourceMappingURL=AvroReader.js.map
\No newline at end of file