UNPKG

3.98 kBJavaScriptView Raw
1"use strict";
2/**
3 * @license
4 * Copyright Google LLC All Rights Reserved.
5 *
6 * Use of this source code is governed by an MIT-style license that can be
7 * found in the LICENSE file at https://angular.io/license
8 */
9Object.defineProperty(exports, "__esModule", { value: true });
10exports.strategy = void 0;
11const rxjs_1 = require("rxjs");
12const operators_1 = require("rxjs/operators");
13const api_1 = require("./api");
14const stableStringify = require('fast-json-stable-stringify');
15// eslint-disable-next-line @typescript-eslint/no-namespace
16var strategy;
17(function (strategy) {
18 /**
19 * Creates a JobStrategy that serializes every call. This strategy can be mixed between jobs.
20 */
21 function serialize() {
22 let latest = rxjs_1.of();
23 return (handler, options) => {
24 const newHandler = (argument, context) => {
25 const previous = latest;
26 latest = rxjs_1.concat(previous.pipe(operators_1.ignoreElements()), new rxjs_1.Observable((o) => handler(argument, context).subscribe(o))).pipe(operators_1.shareReplay(0));
27 return latest;
28 };
29 return Object.assign(newHandler, {
30 jobDescription: Object.assign({}, handler.jobDescription, options),
31 });
32 };
33 }
34 strategy.serialize = serialize;
35 /**
36 * Creates a JobStrategy that will always reuse a running job, and restart it if the job ended.
37 * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
38 * is.
39 */
40 function reuse(replayMessages = false) {
41 let inboundBus = new rxjs_1.Subject();
42 let run = null;
43 let state = null;
44 return (handler, options) => {
45 const newHandler = (argument, context) => {
46 // Forward inputs.
47 const subscription = context.inboundBus.subscribe(inboundBus);
48 if (run) {
49 return rxjs_1.concat(
50 // Update state.
51 rxjs_1.of(state), run).pipe(operators_1.finalize(() => subscription.unsubscribe()));
52 }
53 run = handler(argument, { ...context, inboundBus: inboundBus.asObservable() }).pipe(operators_1.tap((message) => {
54 if (message.kind == api_1.JobOutboundMessageKind.Start ||
55 message.kind == api_1.JobOutboundMessageKind.OnReady ||
56 message.kind == api_1.JobOutboundMessageKind.End) {
57 state = message;
58 }
59 }, undefined, () => {
60 subscription.unsubscribe();
61 inboundBus = new rxjs_1.Subject();
62 run = null;
63 }), replayMessages ? operators_1.shareReplay() : operators_1.share());
64 return run;
65 };
66 return Object.assign(newHandler, handler, options || {});
67 };
68 }
69 strategy.reuse = reuse;
70 /**
71 * Creates a JobStrategy that will reuse a running job if the argument matches.
72 * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it
73 * is.
74 */
75 function memoize(replayMessages = false) {
76 const runs = new Map();
77 return (handler, options) => {
78 const newHandler = (argument, context) => {
79 const argumentJson = stableStringify(argument);
80 const maybeJob = runs.get(argumentJson);
81 if (maybeJob) {
82 return maybeJob;
83 }
84 const run = handler(argument, context).pipe(replayMessages ? operators_1.shareReplay() : operators_1.share());
85 runs.set(argumentJson, run);
86 return run;
87 };
88 return Object.assign(newHandler, handler, options || {});
89 };
90 }
91 strategy.memoize = memoize;
92})(strategy = exports.strategy || (exports.strategy = {}));