1 |
|
2 |
|
3 |
|
4 | import { IStreamResult, IStreamSubscriber, ISubscription } from "./Stream";
|
5 | import { SubjectSubscription } from "./Utils";
|
6 |
|
7 |
|
8 | export class Subject<T> implements IStreamResult<T> {
|
9 |
|
10 | public observers: Array<IStreamSubscriber<T>>;
|
11 |
|
12 |
|
13 | public cancelCallback?: () => Promise<void>;
|
14 |
|
15 | constructor() {
|
16 | this.observers = [];
|
17 | }
|
18 |
|
19 | public next(item: T): void {
|
20 | for (const observer of this.observers) {
|
21 | observer.next(item);
|
22 | }
|
23 | }
|
24 |
|
25 | public error(err: any): void {
|
26 | for (const observer of this.observers) {
|
27 | if (observer.error) {
|
28 | observer.error(err);
|
29 | }
|
30 | }
|
31 | }
|
32 |
|
33 | public complete(): void {
|
34 | for (const observer of this.observers) {
|
35 | if (observer.complete) {
|
36 | observer.complete();
|
37 | }
|
38 | }
|
39 | }
|
40 |
|
41 | public subscribe(observer: IStreamSubscriber<T>): ISubscription<T> {
|
42 | this.observers.push(observer);
|
43 | return new SubjectSubscription(this, observer);
|
44 | }
|
45 | }
|