1 |
|
2 |
|
3 | import { __asyncGenerator, __await } from "tslib";
|
4 |
|
5 |
|
6 | import "@azure/core-paging";
|
7 | import { AVRO_CODEC_KEY, AVRO_INIT_BYTES, AVRO_SCHEMA_KEY, AVRO_SYNC_MARKER_SIZE, } from "./AvroConstants";
|
8 | import { AvroParser, AvroType } from "./AvroParser";
|
9 | import { arraysEqual } from "./utils/utils.common";
|
10 | export class AvroReader {
|
11 | constructor(dataStream, headerStream, currentBlockOffset, indexWithinCurrentBlock) {
|
12 | this._dataStream = dataStream;
|
13 | this._headerStream = headerStream || dataStream;
|
14 | this._initialized = false;
|
15 | this._blockOffset = currentBlockOffset || 0;
|
16 | this._objectIndex = indexWithinCurrentBlock || 0;
|
17 | this._initialBlockOffset = currentBlockOffset || 0;
|
18 | }
|
19 | get blockOffset() {
|
20 | return this._blockOffset;
|
21 | }
|
22 | get objectIndex() {
|
23 | return this._objectIndex;
|
24 | }
|
25 | async initialize(options = {}) {
|
26 | const header = await AvroParser.readFixedBytes(this._headerStream, AVRO_INIT_BYTES.length, {
|
27 | abortSignal: options.abortSignal,
|
28 | });
|
29 | if (!arraysEqual(header, AVRO_INIT_BYTES)) {
|
30 | throw new Error("Stream is not an Avro file.");
|
31 | }
|
32 |
|
33 |
|
34 | this._metadata = await AvroParser.readMap(this._headerStream, AvroParser.readString, {
|
35 | abortSignal: options.abortSignal,
|
36 | });
|
37 |
|
38 | const codec = this._metadata[AVRO_CODEC_KEY];
|
39 | if (!(codec === undefined || codec === null || codec === "null")) {
|
40 | throw new Error("Codecs are not supported");
|
41 | }
|
42 |
|
43 | this._syncMarker = await AvroParser.readFixedBytes(this._headerStream, AVRO_SYNC_MARKER_SIZE, {
|
44 | abortSignal: options.abortSignal,
|
45 | });
|
46 |
|
47 | const schema = JSON.parse(this._metadata[AVRO_SCHEMA_KEY]);
|
48 | this._itemType = AvroType.fromSchema(schema);
|
49 | if (this._blockOffset === 0) {
|
50 | this._blockOffset = this._initialBlockOffset + this._dataStream.position;
|
51 | }
|
52 | this._itemsRemainingInBlock = await AvroParser.readLong(this._dataStream, {
|
53 | abortSignal: options.abortSignal,
|
54 | });
|
55 |
|
56 | await AvroParser.readLong(this._dataStream, { abortSignal: options.abortSignal });
|
57 | this._initialized = true;
|
58 | if (this._objectIndex && this._objectIndex > 0) {
|
59 | for (let i = 0; i < this._objectIndex; i++) {
|
60 | await this._itemType.read(this._dataStream, { abortSignal: options.abortSignal });
|
61 | this._itemsRemainingInBlock--;
|
62 | }
|
63 | }
|
64 | }
|
65 | hasNext() {
|
66 | return !this._initialized || this._itemsRemainingInBlock > 0;
|
67 | }
|
68 | parseObjects(options = {}) {
|
69 | return __asyncGenerator(this, arguments, function* parseObjects_1() {
|
70 | if (!this._initialized) {
|
71 | yield __await(this.initialize(options));
|
72 | }
|
73 | while (this.hasNext()) {
|
74 | const result = yield __await(this._itemType.read(this._dataStream, {
|
75 | abortSignal: options.abortSignal,
|
76 | }));
|
77 | this._itemsRemainingInBlock--;
|
78 | this._objectIndex++;
|
79 | if (this._itemsRemainingInBlock === 0) {
|
80 | const marker = yield __await(AvroParser.readFixedBytes(this._dataStream, AVRO_SYNC_MARKER_SIZE, {
|
81 | abortSignal: options.abortSignal,
|
82 | }));
|
83 | this._blockOffset = this._initialBlockOffset + this._dataStream.position;
|
84 | this._objectIndex = 0;
|
85 | if (!arraysEqual(this._syncMarker, marker)) {
|
86 | throw new Error("Stream is not a valid Avro file.");
|
87 | }
|
88 | try {
|
89 | this._itemsRemainingInBlock = yield __await(AvroParser.readLong(this._dataStream, {
|
90 | abortSignal: options.abortSignal,
|
91 | }));
|
92 | }
|
93 | catch (err) {
|
94 |
|
95 | this._itemsRemainingInBlock = 0;
|
96 | }
|
97 | if (this._itemsRemainingInBlock > 0) {
|
98 |
|
99 | yield __await(AvroParser.readLong(this._dataStream, { abortSignal: options.abortSignal }));
|
100 | }
|
101 | }
|
102 | yield yield __await(result);
|
103 | }
|
104 | });
|
105 | }
|
106 | }
|
107 |
|
\ | No newline at end of file |