UNPKG

6.25 kBPlain TextView Raw
1import { TestRunner } from '@stryker-mutator/api/test-runner';
2import { notEmpty } from '@stryker-mutator/util';
3import { BehaviorSubject, filter, ignoreElements, lastValueFrom, mergeMap, Observable, ReplaySubject, Subject, takeUntil, tap, zip } from 'rxjs';
4import { Disposable, tokens } from 'typed-inject';
5
6import { CheckerFacade } from '../checker/index.js';
7import { coreTokens } from '../di/index.js';
8
9const MAX_CONCURRENT_INIT = 2;
10
11/**
12 * Represents a TestRunner that is also a Resource (with an init and dispose)
13 */
14export type TestRunnerResource = Resource & TestRunner;
15
16export interface Resource extends Partial<Disposable> {
17 init?(): Promise<void>;
18}
19
20createTestRunnerPool.inject = tokens(coreTokens.testRunnerFactory, coreTokens.testRunnerConcurrencyTokens);
21export function createTestRunnerPool(factory: () => TestRunnerResource, concurrencyToken$: Observable<number>): Pool<TestRunner> {
22 return new Pool(factory, concurrencyToken$);
23}
24
25createCheckerPool.inject = tokens(coreTokens.checkerFactory, coreTokens.checkerConcurrencyTokens);
26export function createCheckerPool(factory: () => CheckerFacade, concurrencyToken$: Observable<number>): Pool<CheckerFacade> {
27 return new Pool<CheckerFacade>(factory, concurrencyToken$);
28}
29
30/**
31 * Represents a work item: an input with a task and with a `result$` observable where the result (exactly one) will be streamed to.
32 */
33class WorkItem<TResource extends Resource, TIn, TOut> {
34 private readonly resultSubject = new Subject<TOut>();
35 public readonly result$ = this.resultSubject.asObservable();
36
37 /**
38 * @param input The input to the ask
39 * @param task The task, where a resource and input is presented
40 */
41 constructor(private readonly input: TIn, private readonly task: (resource: TResource, input: TIn) => Promise<TOut> | TOut) {}
42
43 public async execute(resource: TResource) {
44 try {
45 const output = await this.task(resource, this.input);
46 this.resultSubject.next(output);
47 this.resultSubject.complete();
48 } catch (err) {
49 this.resultSubject.error(err);
50 }
51 }
52
53 public reject(error: unknown) {
54 this.resultSubject.error(error);
55 }
56
57 public complete() {
58 this.resultSubject.complete();
59 }
60}
61
62/**
63 * Represents a pool of resources. Use `schedule` to schedule work to be executed on the resources.
64 * The pool will automatically recycle the resources, but will make sure only one task is executed
65 * on one resource at any one time. Creates as many resources as the concurrency tokens allow.
66 * Also takes care of the initialing of the resources (with `init()`)
67 */
68export class Pool<TResource extends Resource> implements Disposable {
69 // The init subject. Using an RxJS subject instead of a promise, so errors are silently ignored when nobody is listening
70 private readonly initSubject = new ReplaySubject<void>();
71
72 // The disposedSubject emits true when it is disposed, and false when not disposed yet
73 private readonly disposedSubject = new BehaviorSubject(false);
74
75 // The dispose$ only emits one `true` value when disposed (never emits `false`). Useful for `takeUntil`
76 private readonly dispose$ = this.disposedSubject.pipe(filter((isDisposed) => isDisposed));
77
78 private readonly createdResources: TResource[] = [];
79 // The queued work items. This is a replay subject, so scheduled work items can easily be rejected after it was picked up
80 private readonly todoSubject = new ReplaySubject<WorkItem<TResource, any, any>>();
81
82 constructor(factory: () => TResource, concurrencyToken$: Observable<number>) {
83 // Stream resources that are ready to pick up work
84 const resourcesSubject = new Subject<TResource>();
85
86 // Stream ongoing work.
87 zip(resourcesSubject, this.todoSubject)
88 .pipe(
89 mergeMap(async ([resource, workItem]) => {
90 await workItem.execute(resource);
91 resourcesSubject.next(resource); // recycle resource so it can pick up more work
92 }),
93 ignoreElements(),
94 takeUntil(this.dispose$)
95 )
96 .subscribe({
97 error: (error) => {
98 this.todoSubject.subscribe((workItem) => workItem.reject(error));
99 },
100 });
101
102 // Create resources
103 concurrencyToken$
104 .pipe(
105 takeUntil(this.dispose$),
106 mergeMap(async () => {
107 if (this.disposedSubject.value) {
108 // Don't create new resources when disposed
109 return;
110 }
111 const resource = factory();
112 this.createdResources.push(resource);
113 await resource.init?.();
114 return resource;
115 }, MAX_CONCURRENT_INIT),
116 filter(notEmpty),
117 tap({
118 complete: () => {
119 // Signal init complete
120 this.initSubject.next();
121 this.initSubject.complete();
122 },
123 error: (err) => {
124 this.initSubject.error(err);
125 },
126 })
127 )
128 .subscribe({
129 next: (resource) => resourcesSubject.next(resource),
130 error: (err) => resourcesSubject.error(err),
131 });
132 }
133
134 /**
135 * Returns a promise that resolves if all concurrency tokens have resulted in initialized resources.
136 * This is optional, resources will get initialized either way.
137 */
138 public async init(): Promise<void> {
139 await lastValueFrom(this.initSubject);
140 }
141
142 /**
143 * Schedules a task to be executed on resources in the pool. Each input is paired with a resource, which allows async work to be done.
144 * @param input$ The inputs to pair up with a resource.
145 * @param task The task to execute on each resource
146 */
147 public schedule<TIn, TOut>(input$: Observable<TIn>, task: (resource: TResource, input: TIn) => Promise<TOut> | TOut): Observable<TOut> {
148 return input$.pipe(
149 mergeMap((input) => {
150 const workItem = new WorkItem(input, task);
151 this.todoSubject.next(workItem);
152 return workItem.result$;
153 })
154 );
155 }
156
157 /**
158 * Dispose the pool
159 */
160 public async dispose(): Promise<void> {
161 if (!this.disposedSubject.value) {
162 this.disposedSubject.next(true);
163 this.todoSubject.subscribe((workItem) => workItem.complete());
164 this.todoSubject.complete();
165 await Promise.all(this.createdResources.map((resource) => resource.dispose?.()));
166 }
167 }
168}
169
\No newline at end of file