1 | import { Observable } from 'rxjs/Observable';
|
2 | import createDebug from 'debug';
|
3 |
|
4 | import 'rxjs/add/observable/merge';
|
5 | import 'rxjs/add/observable/from';
|
6 | import 'rxjs/add/observable/timer';
|
7 | import 'rxjs/add/operator/map';
|
8 | import 'rxjs/add/operator/publishReplay';
|
9 | import 'rxjs/add/operator/retryWhen';
|
10 | import 'rxjs/add/operator/do';
|
11 | import 'rxjs/add/operator/delay';
|
12 | import 'rxjs/add/operator/switchMap';
|
13 |
|
14 | const debug = createDebug('cyboard.server');
|
15 |
|
16 | const handleRetry = id => (errors) => {
|
17 | let timer = 1;
|
18 | return errors.switchMap(() => {
|
19 | const delay = 2 * timer;
|
20 | timer += 1;
|
21 | debug(`widget backend ${id} failed. Retry in ${delay} seconds.`);
|
22 | return Observable.timer(delay * 1000);
|
23 | });
|
24 | };
|
25 |
|
26 | const wrapWidgetData = id => data => ({ id, data });
|
27 |
|
28 | const wrapWidgetBackend = widget => Observable.from(widget.backend)
|
29 | .map(wrapWidgetData(widget.id))
|
30 | .retryWhen(handleRetry(widget.id))
|
31 | .publishReplay(1)
|
32 | .refCount();
|
33 |
|
34 | export function mergeWidgetBackends(config) {
|
35 | const backends = Object.values(config)
|
36 | .reduce((widgets, board) => widgets.concat(board.widgets), [])
|
37 | .filter(widget => !!widget)
|
38 | .map(widget => wrapWidgetBackend(widget));
|
39 |
|
40 | return Observable.merge(...backends);
|
41 | }
|