UNPKG

9.07 kBJavaScriptView Raw
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3import { __asyncGenerator, __await } from "tslib";
4import { getPathFromLink, ResourceType, StatusCodes } from "./common";
5import { DefaultQueryExecutionContext, getInitialHeader, mergeHeaders, PipelinedQueryExecutionContext, } from "./queryExecutionContext";
6import { FeedResponse } from "./request/FeedResponse";
7/**
8 * Represents a QueryIterator Object, an implementation of feed or query response that enables
9 * traversal and iterating over the response
10 * in the Azure Cosmos DB database service.
11 */
12export class QueryIterator {
13 /**
14 * @hidden
15 */
16 constructor(clientContext, query, options, fetchFunctions, resourceLink, resourceType) {
17 this.clientContext = clientContext;
18 this.query = query;
19 this.options = options;
20 this.fetchFunctions = fetchFunctions;
21 this.resourceLink = resourceLink;
22 this.resourceType = resourceType;
23 this.query = query;
24 this.fetchFunctions = fetchFunctions;
25 this.options = options || {};
26 this.resourceLink = resourceLink;
27 this.fetchAllLastResHeaders = getInitialHeader();
28 this.reset();
29 this.isInitialized = false;
30 }
31 /**
32 * Gets an async iterator that will yield results until completion.
33 *
34 * NOTE: AsyncIterators are a very new feature and you might need to
35 * use polyfils/etc. in order to use them in your code.
36 *
37 * If you're using TypeScript, you can use the following polyfill as long
38 * as you target ES6 or higher and are running on Node 6 or higher.
39 *
40 * ```typescript
41 * if (!Symbol || !Symbol.asyncIterator) {
42 * (Symbol as any).asyncIterator = Symbol.for("Symbol.asyncIterator");
43 * }
44 * ```
45 *
46 * @example Iterate over all databases
47 * ```typescript
48 * for await(const { resources: db } of client.databases.readAll().getAsyncIterator()) {
49 * console.log(`Got ${db} from AsyncIterator`);
50 * }
51 * ```
52 */
53 getAsyncIterator() {
54 return __asyncGenerator(this, arguments, function* getAsyncIterator_1() {
55 this.reset();
56 this.queryPlanPromise = this.fetchQueryPlan();
57 while (this.queryExecutionContext.hasMoreResults()) {
58 let response;
59 try {
60 response = yield __await(this.queryExecutionContext.fetchMore());
61 }
62 catch (error) {
63 if (this.needsQueryPlan(error)) {
64 yield __await(this.createPipelinedExecutionContext());
65 try {
66 response = yield __await(this.queryExecutionContext.fetchMore());
67 }
68 catch (queryError) {
69 this.handleSplitError(queryError);
70 }
71 }
72 else {
73 throw error;
74 }
75 }
76 const feedResponse = new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults());
77 if (response.result !== undefined) {
78 yield yield __await(feedResponse);
79 }
80 }
81 });
82 }
83 /**
84 * Determine if there are still remaining resources to processs based on the value of the continuation token or the
85 * elements remaining on the current batch in the QueryIterator.
86 * @returns true if there is other elements to process in the QueryIterator.
87 */
88 hasMoreResults() {
89 return this.queryExecutionContext.hasMoreResults();
90 }
91 /**
92 * Fetch all pages for the query and return a single FeedResponse.
93 */
94 async fetchAll() {
95 this.reset();
96 this.fetchAllTempResources = [];
97 let response;
98 try {
99 response = await this.toArrayImplementation();
100 }
101 catch (error) {
102 this.handleSplitError(error);
103 }
104 return response;
105 }
106 /**
107 * Retrieve the next batch from the feed.
108 *
109 * This may or may not fetch more pages from the backend depending on your settings
110 * and the type of query. Aggregate queries will generally fetch all backend pages
111 * before returning the first batch of responses.
112 */
113 async fetchNext() {
114 this.queryPlanPromise = this.fetchQueryPlan();
115 if (!this.isInitialized) {
116 await this.init();
117 }
118 let response;
119 try {
120 response = await this.queryExecutionContext.fetchMore();
121 }
122 catch (error) {
123 if (this.needsQueryPlan(error)) {
124 await this.createPipelinedExecutionContext();
125 try {
126 response = await this.queryExecutionContext.fetchMore();
127 }
128 catch (queryError) {
129 this.handleSplitError(queryError);
130 }
131 }
132 else {
133 throw error;
134 }
135 }
136 return new FeedResponse(response.result, response.headers, this.queryExecutionContext.hasMoreResults());
137 }
138 /**
139 * Reset the QueryIterator to the beginning and clear all the resources inside it
140 */
141 reset() {
142 this.queryPlanPromise = undefined;
143 this.queryExecutionContext = new DefaultQueryExecutionContext(this.options, this.fetchFunctions);
144 }
145 async toArrayImplementation() {
146 this.queryPlanPromise = this.fetchQueryPlan();
147 if (!this.isInitialized) {
148 await this.init();
149 }
150 while (this.queryExecutionContext.hasMoreResults()) {
151 let response;
152 try {
153 response = await this.queryExecutionContext.nextItem();
154 }
155 catch (error) {
156 if (this.needsQueryPlan(error)) {
157 await this.createPipelinedExecutionContext();
158 response = await this.queryExecutionContext.nextItem();
159 }
160 else {
161 throw error;
162 }
163 }
164 const { result, headers } = response;
165 // concatenate the results and fetch more
166 mergeHeaders(this.fetchAllLastResHeaders, headers);
167 if (result !== undefined) {
168 this.fetchAllTempResources.push(result);
169 }
170 }
171 return new FeedResponse(this.fetchAllTempResources, this.fetchAllLastResHeaders, this.queryExecutionContext.hasMoreResults());
172 }
173 async createPipelinedExecutionContext() {
174 const queryPlanResponse = await this.queryPlanPromise;
175 // We always coerce queryPlanPromise to resolved. So if it errored, we need to manually inspect the resolved value
176 if (queryPlanResponse instanceof Error) {
177 throw queryPlanResponse;
178 }
179 const queryPlan = queryPlanResponse.result;
180 const queryInfo = queryPlan.queryInfo;
181 if (queryInfo.aggregates.length > 0 && queryInfo.hasSelectValue === false) {
182 throw new Error("Aggregate queries must use the VALUE keyword");
183 }
184 this.queryExecutionContext = new PipelinedQueryExecutionContext(this.clientContext, this.resourceLink, this.query, this.options, queryPlan);
185 }
186 async fetchQueryPlan() {
187 if (!this.queryPlanPromise && this.resourceType === ResourceType.item) {
188 return this.clientContext
189 .getQueryPlan(getPathFromLink(this.resourceLink) + "/docs", ResourceType.item, this.resourceLink, this.query, this.options)
190 .catch((error) => error); // Without this catch, node reports an unhandled rejection. So we stash the promise as resolved even if it errored.
191 }
192 return this.queryPlanPromise;
193 }
194 needsQueryPlan(error) {
195 var _a;
196 if (((_a = error.body) === null || _a === void 0 ? void 0 : _a.additionalErrorInfo) ||
197 error.message.includes("Cross partition query only supports")) {
198 return error.code === StatusCodes.BadRequest && this.resourceType === ResourceType.item;
199 }
200 else {
201 throw error;
202 }
203 }
204 async init() {
205 if (this.isInitialized === true) {
206 return;
207 }
208 if (this.initPromise === undefined) {
209 this.initPromise = this._init();
210 }
211 return this.initPromise;
212 }
213 async _init() {
214 if (this.options.forceQueryPlan === true && this.resourceType === ResourceType.item) {
215 await this.createPipelinedExecutionContext();
216 }
217 this.isInitialized = true;
218 }
219 handleSplitError(err) {
220 if (err.code === 410) {
221 const error = new Error("Encountered partition split and could not recover. This request is retryable");
222 error.code = 503;
223 error.originalError = err;
224 throw error;
225 }
226 else {
227 throw err;
228 }
229 }
230}
231//# sourceMappingURL=queryIterator.js.map
\No newline at end of file