UNPKG

13.1 kBJavaScriptView Raw
1"use strict";
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5Object.defineProperty(exports, "__esModule", { value: true });
6exports.Transactions = exports.TransactionAttemptContext = exports.TransactionQueryResult = exports.TransactionGetResult = exports.TransactionResult = exports.DocumentId = void 0;
7const binding_1 = __importDefault(require("./binding"));
8const bindingutilities_1 = require("./bindingutilities");
9const errors_1 = require("./errors");
10const queryexecutor_1 = require("./queryexecutor");
11const transcoders_1 = require("./transcoders");
12const utilities_1 = require("./utilities");
13/**
14 * Represents the path to a document.
15 *
16 * @category Transactions
17 */
18class DocumentId {
19 constructor() {
20 this.bucket = '';
21 this.scope = '';
22 this.collection = '';
23 this.key = '';
24 }
25}
26exports.DocumentId = DocumentId;
27/**
28 * Contains the results of a Transaction.
29 *
30 * @category Transactions
31 */
32class TransactionResult {
33 /**
34 * @internal
35 */
36 constructor(data) {
37 this.transactionId = data.transactionId;
38 this.unstagingComplete = data.unstagingComplete;
39 }
40}
41exports.TransactionResult = TransactionResult;
42/**
43 * Contains the results of a transactional Get operation.
44 *
45 * @category Transactions
46 */
47class TransactionGetResult {
48 /**
49 * @internal
50 */
51 constructor(data) {
52 this.id = data.id;
53 this.content = data.content;
54 this.cas = data.cas;
55 this._links = data._links;
56 this._metadata = data._metadata;
57 }
58}
59exports.TransactionGetResult = TransactionGetResult;
60/**
61 * Contains the results of a transactional Query operation.
62 *
63 * @category Transactions
64 */
65class TransactionQueryResult {
66 /**
67 * @internal
68 */
69 constructor(data) {
70 this.rows = data.rows;
71 this.meta = data.meta;
72 }
73}
74exports.TransactionQueryResult = TransactionQueryResult;
75/**
76 * @internal
77 */
78function translateGetResult(cppRes) {
79 if (!cppRes) {
80 return null;
81 }
82 let content;
83 if (cppRes.content && cppRes.content.length > 0) {
84 try {
85 content = JSON.parse(cppRes.content.toString('utf8'));
86 }
87 catch (e) {
88 content = cppRes.content;
89 }
90 }
91 return new TransactionGetResult({
92 id: cppRes.id,
93 content: content,
94 cas: cppRes.cas,
95 _links: cppRes.links,
96 _metadata: cppRes.metadata,
97 });
98}
99/**
100 * Provides an interface to preform transactional operations in a transaction.
101 *
102 * @category Transactions
103 */
104class TransactionAttemptContext {
105 /**
106 * @internal
107 */
108 constructor(txns, config) {
109 if (!config) {
110 config = {};
111 }
112 this._impl = new binding_1.default.Transaction(txns.impl, {
113 durability_level: (0, bindingutilities_1.durabilityToCpp)(config.durabilityLevel),
114 timeout: config.timeout,
115 query_scan_consistency: (0, bindingutilities_1.queryScanConsistencyToCpp)(undefined),
116 });
117 this._transcoder = new transcoders_1.DefaultTranscoder();
118 }
119 /**
120 @internal
121 */
122 get impl() {
123 return this._impl;
124 }
125 /**
126 * @internal
127 */
128 _newAttempt() {
129 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
130 this._impl.newAttempt((cppErr) => {
131 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
132 wrapCallback(err);
133 });
134 });
135 }
136 /**
137 * Retrieves the value of a document from the collection.
138 *
139 * @param collection The collection the document lives in.
140 * @param key The document key to retrieve.
141 */
142 async get(collection, key) {
143 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
144 const id = collection._cppDocId(key);
145 this._impl.get({
146 id,
147 }, (cppErr, cppRes) => {
148 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
149 if (err) {
150 return wrapCallback(err, null);
151 }
152 wrapCallback(err, translateGetResult(cppRes));
153 });
154 });
155 }
156 /**
157 * Inserts a new document to the collection, failing if the document already exists.
158 *
159 * @param collection The collection the document lives in.
160 * @param key The document key to insert.
161 * @param content The document content to insert.
162 */
163 async insert(collection, key, content) {
164 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
165 const id = collection._cppDocId(key);
166 const [data, flags] = this._transcoder.encode(content);
167 this._impl.insert({
168 id,
169 content: {
170 data,
171 flags,
172 },
173 }, (cppErr, cppRes) => {
174 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
175 if (err) {
176 return wrapCallback(err, null);
177 }
178 wrapCallback(err, translateGetResult(cppRes));
179 });
180 });
181 }
182 /**
183 * Replaces a document in a collection.
184 *
185 * @param doc The document to replace.
186 * @param content The document content to insert.
187 */
188 async replace(doc, content) {
189 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
190 const [data, flags] = this._transcoder.encode(content);
191 this._impl.replace({
192 doc: {
193 id: doc.id,
194 content: Buffer.from(''),
195 cas: doc.cas,
196 links: doc._links,
197 metadata: doc._metadata,
198 },
199 content: {
200 data,
201 flags,
202 },
203 }, (cppErr, cppRes) => {
204 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
205 if (err) {
206 return wrapCallback(err, null);
207 }
208 wrapCallback(err, translateGetResult(cppRes));
209 });
210 });
211 }
212 /**
213 * Removes a document from a collection.
214 *
215 * @param doc The document to remove.
216 */
217 async remove(doc) {
218 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
219 this._impl.remove({
220 doc: {
221 id: doc.id,
222 content: Buffer.from(''),
223 cas: doc.cas,
224 links: doc._links,
225 metadata: doc._metadata,
226 },
227 }, (cppErr) => {
228 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
229 wrapCallback(err, null);
230 });
231 });
232 }
233 /**
234 * Executes a query in the context of this transaction.
235 *
236 * @param statement The statement to execute.
237 * @param options Optional parameters for this operation.
238 */
239 async query(statement, options) {
240 // This await statement is explicit here to ensure our query is completely
241 // processed before returning the result to the user (no row streaming).
242 const syncQueryRes = await queryexecutor_1.QueryExecutor.execute((callback) => {
243 if (!options) {
244 options = {};
245 }
246 this._impl.query(statement, {
247 scan_consistency: (0, bindingutilities_1.queryScanConsistencyToCpp)(options.scanConsistency),
248 ad_hoc: options.adhoc === false ? false : true,
249 client_context_id: options.clientContextId,
250 pipeline_batch: options.pipelineBatch,
251 pipeline_cap: options.pipelineCap,
252 max_parallelism: options.maxParallelism,
253 scan_wait: options.scanWait,
254 scan_cap: options.scanCap,
255 readonly: options.readOnly || false,
256 profile: (0, bindingutilities_1.queryProfileToCpp)(options.profile),
257 metrics: options.metrics || false,
258 raw: options.raw
259 ? Object.fromEntries(Object.entries(options.raw)
260 .filter(([, v]) => v !== undefined)
261 .map(([k, v]) => [k, Buffer.from(JSON.stringify(v))]))
262 : {},
263 positional_parameters: options.parameters && Array.isArray(options.parameters)
264 ? options.parameters.map((v) => Buffer.from(JSON.stringify(v !== null && v !== void 0 ? v : null)))
265 : [],
266 named_parameters: options.parameters && !Array.isArray(options.parameters)
267 ? Object.fromEntries(Object.entries(options.parameters)
268 .filter(([, v]) => v !== undefined)
269 .map(([k, v]) => [k, Buffer.from(JSON.stringify(v))]))
270 : {},
271 }, (cppErr, resp) => {
272 callback(cppErr, resp);
273 });
274 });
275 return new TransactionQueryResult({
276 rows: syncQueryRes.rows,
277 meta: syncQueryRes.meta,
278 });
279 }
280 /**
281 * @internal
282 */
283 async _commit() {
284 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
285 this._impl.commit((cppErr, cppRes) => {
286 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
287 let res = null;
288 if (cppRes) {
289 res = new TransactionResult({
290 transactionId: cppRes.transaction_id,
291 unstagingComplete: cppRes.unstaging_complete,
292 });
293 }
294 wrapCallback(err, res);
295 });
296 });
297 }
298 /**
299 * @internal
300 */
301 async _rollback() {
302 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
303 this._impl.rollback((cppErr) => {
304 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
305 wrapCallback(err);
306 });
307 });
308 }
309}
310exports.TransactionAttemptContext = TransactionAttemptContext;
311/**
312 * Provides an interface to access transactions.
313 *
314 * @category Transactions
315 */
316class Transactions {
317 /**
318 @internal
319 */
320 constructor(cluster, config) {
321 if (!config) {
322 config = {};
323 }
324 if (!config.cleanupConfig) {
325 config.cleanupConfig = {};
326 }
327 if (!config.queryConfig) {
328 config.queryConfig = {};
329 }
330 const connImpl = cluster.conn;
331 try {
332 const txnsImpl = new binding_1.default.Transactions(connImpl, {
333 durability_level: (0, bindingutilities_1.durabilityToCpp)(config.durabilityLevel),
334 timeout: config.timeout,
335 query_scan_consistency: (0, bindingutilities_1.queryScanConsistencyToCpp)(config.queryConfig.scanConsistency),
336 cleanup_window: config.cleanupConfig.cleanupWindow,
337 cleanup_lost_attempts: !config.cleanupConfig.disableLostAttemptCleanup,
338 cleanup_client_attempts: !config.cleanupConfig.disableClientAttemptCleanup,
339 metadata_collection: (0, bindingutilities_1.transactionKeyspaceToCpp)(config.metadataCollection),
340 });
341 this._cluster = cluster;
342 this._impl = txnsImpl;
343 }
344 catch (err) {
345 throw (0, bindingutilities_1.errorFromCpp)(err);
346 }
347 }
348 /**
349 @internal
350 */
351 get impl() {
352 return this._impl;
353 }
354 /**
355 @internal
356 */
357 _close() {
358 return utilities_1.PromiseHelper.wrap((wrapCallback) => {
359 this._impl.close((cppErr) => {
360 const err = (0, bindingutilities_1.errorFromCpp)(cppErr);
361 wrapCallback(err, null);
362 });
363 });
364 }
365 /**
366 * Executes a transaction.
367 *
368 * @param logicFn The transaction lambda to execute.
369 * @param config Configuration operations for the transaction.
370 */
371 async run(logicFn, config) {
372 const txn = new TransactionAttemptContext(this, config);
373 for (;;) {
374 await txn._newAttempt();
375 try {
376 await logicFn(txn);
377 }
378 catch (e) {
379 await txn._rollback();
380 if (e instanceof errors_1.TransactionOperationFailedError) {
381 throw new errors_1.TransactionFailedError(e.cause, e.context);
382 }
383 throw new errors_1.TransactionFailedError(e);
384 }
385 try {
386 const txnResult = await txn._commit(); // this is actually finalize internally
387 if (!txnResult) {
388 // no result and no error, try again
389 continue;
390 }
391 return txnResult;
392 }
393 catch (e) {
394 // commit failed, retry...
395 }
396 }
397 }
398}
399exports.Transactions = Transactions;