UNPKG

7.48 kBJavaScriptView Raw
1import _ from 'lodash';
2import Reference from './Reference.js';
3import {wrapPromiseCallback} from './utils/promises.js';
4import {joinPath} from './utils/paths.js';
5
6
7const INTERCEPT_KEYS = [
8 'read', 'write', 'auth', 'set', 'update', 'commit', 'connect', 'peek', 'authenticate',
9 'unathenticate', 'certify', 'all'
10];
11
12const EMPTY_ARRAY = [];
13
14
15class SlowHandle {
16 constructor(operation, delay, callback) {
17 this._operation = operation;
18 this._delay = delay;
19 this._callback = callback;
20 this._fired = false;
21 }
22
23 initiate() {
24 this.cancel();
25 this._fired = false;
26 const elapsed = Date.now() - this._operation._startTimestamp;
27 this._timeoutId = setTimeout(() => {
28 this._fired = true;
29 this._callback(this._operation);
30 }, this._delay - elapsed);
31 }
32
33 cancel() {
34 if (this._fired) this._callback(this._operation);
35 if (this._timeoutId) clearTimeout(this._timeoutId);
36 }
37}
38
39
40class Operation {
41 constructor(type, method, target, operand) {
42 this._type = type;
43 this._method = method;
44 this._target = target;
45 this._operand = operand;
46 this._ready = false;
47 this._running = false;
48 this._ended = false;
49 this._tries = 0;
50 this._startTimestamp = Date.now();
51 this._slowHandles = [];
52 }
53
54 get type() {return this._type;}
55 get method() {return this._method;}
56 get target() {return this._target;}
57 get targets() {
58 if (this._method !== 'update') return [this._target];
59 return _.map(this._operand, (value, escapedPathFragment) => {
60 return new Reference(
61 this._target._tree, joinPath(this._target.path, escapedPathFragment),
62 this._target._annotations);
63 });
64 }
65 get operand() {return this._operand;}
66 get ready() {return this._ready;}
67 get running() {return this._running;}
68 get ended() {return this._ended;}
69 get tries() {return this._tries;}
70 get error() {return this._error;}
71
72 onSlow(delay, callback) {
73 const handle = new SlowHandle(this, delay, callback);
74 this._slowHandles.push(handle);
75 handle.initiate();
76 }
77
78 _setRunning(value) {
79 this._running = value;
80 }
81
82 _setEnded(value) {
83 this._ended = value;
84 }
85
86 _markReady(ending) {
87 this._ready = true;
88 if (!ending) this._tries = 0;
89 _.forEach(this._slowHandles, handle => handle.cancel());
90 }
91
92 _clearReady() {
93 this._ready = false;
94 this._startTimestamp = Date.now();
95 _.forEach(this._slowHandles, handle => handle.initiate());
96 }
97
98 _incrementTries() {
99 this._tries++;
100 }
101}
102
103
104export default class Dispatcher {
105 constructor(bridge) {
106 this._bridge = bridge;
107 this._callbacks = {};
108 Object.freeze(this);
109 }
110
111 intercept(interceptKey, callbacks) {
112 if (!_.includes(INTERCEPT_KEYS, interceptKey)) {
113 throw new Error('Unknown intercept operation type: ' + interceptKey);
114 }
115 const badCallbackKeys =
116 _.difference(_.keys(callbacks), ['onBefore', 'onAfter', 'onError', 'onFailure']);
117 if (badCallbackKeys.length) {
118 throw new Error('Unknown intercept callback types: ' + badCallbackKeys.join(', '));
119 }
120 const wrappedCallbacks = {
121 onBefore: this._addCallback('onBefore', interceptKey, callbacks.onBefore),
122 onAfter: this._addCallback('onAfter', interceptKey, callbacks.onAfter),
123 onError: this._addCallback('onError', interceptKey, callbacks.onError),
124 onFailure: this._addCallback('onFailure', interceptKey, callbacks.onFailure)
125 };
126 return this._removeCallbacks.bind(this, interceptKey, wrappedCallbacks);
127 }
128
129 _addCallback(stage, interceptKey, callback) {
130 if (!callback) return;
131 const key = this._getCallbacksKey(stage, interceptKey);
132 const wrappedCallback = wrapPromiseCallback(callback);
133 (this._callbacks[key] || (this._callbacks[key] = [])).push(wrappedCallback);
134 return wrappedCallback;
135 }
136
137 _removeCallback(stage, interceptKey, wrappedCallback) {
138 if (!wrappedCallback) return;
139 const key = this._getCallbacksKey(stage, interceptKey);
140 if (this._callbacks[key]) _.pull(this._callbacks[key], wrappedCallback);
141 }
142
143 _removeCallbacks(interceptKey, wrappedCallbacks) {
144 _.forEach(wrappedCallbacks, (wrappedCallback, stage) => {
145 this._removeCallback(stage, interceptKey, wrappedCallback);
146 });
147 }
148
149 _getCallbacks(stage, operationType, method) {
150 return [].concat(
151 this._callbacks[this._getCallbacksKey(stage, method)] || EMPTY_ARRAY,
152 this._callbacks[this._getCallbacksKey(stage, operationType)] || EMPTY_ARRAY,
153 this._callbacks[this._getCallbacksKey(stage, 'all')] || EMPTY_ARRAY
154 );
155 }
156
157 _getCallbacksKey(stage, interceptKey) {
158 return `${stage}_${interceptKey}`;
159 }
160
161 execute(operationType, method, target, operand, executor) {
162 executor = wrapPromiseCallback(executor);
163 const operation = this.createOperation(operationType, method, target, operand);
164 return this.begin(operation).then(() => {
165 const executeWithRetries = () => {
166 return executor().catch(e => this._retryOrEnd(operation, e).then(executeWithRetries));
167 };
168 return executeWithRetries();
169 }).then(result => this.end(operation).then(() => result));
170 }
171
172 createOperation(operationType, method, target, operand) {
173 return new Operation(operationType, method, target, operand);
174 }
175
176 begin(operation) {
177 return Promise.all(_.map(
178 this._getCallbacks('onBefore', operation.type, operation.method),
179 onBefore => onBefore(operation)
180 )).then(() => {
181 if (!operation.ended) operation._setRunning(true);
182 }, e => this.end(operation, e));
183 }
184
185 markReady(operation) {
186 operation._markReady();
187 }
188
189 clearReady(operation) {
190 operation._clearReady();
191 }
192
193 retry(operation, error) {
194 operation._incrementTries();
195 operation._error = error;
196 return Promise.all(_.map(
197 this._getCallbacks('onError', operation.type, operation.method),
198 onError => onError(operation, error)
199 )).then(results => {
200 // If the operation ended in the meantime, bail. This will cause the caller to attempt to
201 // fail the operation, but since it's already ended the call to end() with an error will be a
202 // no-op.
203 if (operation.ended) return;
204 const retrying = _.some(results);
205 if (retrying) delete operation._error;
206 return retrying;
207 });
208 }
209
210 _retryOrEnd(operation, error) {
211 return this.retry(operation, error).then(result => {
212 if (!result) return this.end(operation, error);
213 }, e => this.end(operation, e));
214 }
215
216 end(operation, error) {
217 if (operation.ended) return Promise.resolve();
218 operation._setRunning(false);
219 operation._setEnded(true);
220 if (error) {
221 operation._error = error;
222 } else {
223 // In case we're racing with a retry(), wipe out the error.
224 delete operation._error;
225 }
226 return Promise.all(_.map(
227 this._getCallbacks('onAfter', operation.type, operation.method),
228 onAfter => onAfter(operation)
229 )).then(
230 () => this._afterEnd(operation),
231 e => {
232 operation._error = e;
233 return this._afterEnd(operation);
234 }
235 );
236 }
237
238 _afterEnd(operation) {
239 operation._markReady(true);
240 if (!operation.error) return Promise.resolve();
241 const onFailureCallbacks = this._getCallbacks('onFailure', operation.type, operation.method);
242 if (onFailureCallbacks) {
243 setTimeout(() => {
244 _.forEach(onFailureCallbacks, onFailure => onFailure(operation));
245 }, 0);
246 }
247 return Promise.reject(operation.error);
248 }
249}
250