UNPKG

6.31 kBJavaScriptView Raw
1"use strict";
2/*
3 * Copyright The OpenTelemetry Authors
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17Object.defineProperty(exports, "__esModule", { value: true });
18exports.BatchSpanProcessorBase = void 0;
19const api_1 = require("@opentelemetry/api");
20const core_1 = require("@opentelemetry/core");
21/**
22 * Implementation of the {@link SpanProcessor} that batches spans exported by
23 * the SDK then pushes them to the exporter pipeline.
24 */
25class BatchSpanProcessorBase {
26 constructor(_exporter, config) {
27 this._exporter = _exporter;
28 this._finishedSpans = [];
29 this._isShutdown = false;
30 this._shuttingDownPromise = Promise.resolve();
31 const env = core_1.getEnv();
32 this._maxExportBatchSize =
33 typeof (config === null || config === void 0 ? void 0 : config.maxExportBatchSize) === 'number'
34 ? config.maxExportBatchSize
35 : env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE;
36 this._maxQueueSize =
37 typeof (config === null || config === void 0 ? void 0 : config.maxQueueSize) === 'number'
38 ? config.maxQueueSize
39 : env.OTEL_BSP_MAX_QUEUE_SIZE;
40 this._scheduledDelayMillis =
41 typeof (config === null || config === void 0 ? void 0 : config.scheduledDelayMillis) === 'number'
42 ? config.scheduledDelayMillis
43 : env.OTEL_BSP_SCHEDULE_DELAY;
44 this._exportTimeoutMillis =
45 typeof (config === null || config === void 0 ? void 0 : config.exportTimeoutMillis) === 'number'
46 ? config.exportTimeoutMillis
47 : env.OTEL_BSP_EXPORT_TIMEOUT;
48 }
49 forceFlush() {
50 if (this._isShutdown) {
51 return this._shuttingDownPromise;
52 }
53 return this._flushAll();
54 }
55 // does nothing.
56 onStart(_span) { }
57 onEnd(span) {
58 if (this._isShutdown) {
59 return;
60 }
61 this._addToBuffer(span);
62 }
63 shutdown() {
64 if (this._isShutdown) {
65 return this._shuttingDownPromise;
66 }
67 this._isShutdown = true;
68 this._shuttingDownPromise = new Promise((resolve, reject) => {
69 Promise.resolve()
70 .then(() => {
71 return this.onShutdown();
72 })
73 .then(() => {
74 return this._flushAll();
75 })
76 .then(() => {
77 return this._exporter.shutdown();
78 })
79 .then(resolve)
80 .catch(e => {
81 reject(e);
82 });
83 });
84 return this._shuttingDownPromise;
85 }
86 /** Add a span in the buffer. */
87 _addToBuffer(span) {
88 if (this._finishedSpans.length >= this._maxQueueSize) {
89 // limit reached, drop span
90 return;
91 }
92 this._finishedSpans.push(span);
93 this._maybeStartTimer();
94 }
95 /**
96 * Send all spans to the exporter respecting the batch size limit
97 * This function is used only on forceFlush or shutdown,
98 * for all other cases _flush should be used
99 * */
100 _flushAll() {
101 return new Promise((resolve, reject) => {
102 const promises = [];
103 // calculate number of batches
104 const count = Math.ceil(this._finishedSpans.length / this._maxExportBatchSize);
105 for (let i = 0, j = count; i < j; i++) {
106 promises.push(this._flushOneBatch());
107 }
108 Promise.all(promises)
109 .then(() => {
110 resolve();
111 })
112 .catch(reject);
113 });
114 }
115 _flushOneBatch() {
116 this._clearTimer();
117 if (this._finishedSpans.length === 0) {
118 return Promise.resolve();
119 }
120 return new Promise((resolve, reject) => {
121 const timer = setTimeout(() => {
122 // don't wait anymore for export, this way the next batch can start
123 reject(new Error('Timeout'));
124 }, this._exportTimeoutMillis);
125 // prevent downstream exporter calls from generating spans
126 api_1.context.with(core_1.suppressTracing(api_1.context.active()), () => {
127 // Reset the finished spans buffer here because the next invocations of the _flush method
128 // could pass the same finished spans to the exporter if the buffer is cleared
129 // outside of the execution of this callback.
130 this._exporter.export(this._finishedSpans.splice(0, this._maxExportBatchSize), result => {
131 var _a;
132 clearTimeout(timer);
133 if (result.code === core_1.ExportResultCode.SUCCESS) {
134 resolve();
135 }
136 else {
137 reject((_a = result.error) !== null && _a !== void 0 ? _a : new Error('BatchSpanProcessor: span export failed'));
138 }
139 });
140 });
141 });
142 }
143 _maybeStartTimer() {
144 if (this._timer !== undefined)
145 return;
146 this._timer = setTimeout(() => {
147 this._flushOneBatch()
148 .then(() => {
149 if (this._finishedSpans.length > 0) {
150 this._clearTimer();
151 this._maybeStartTimer();
152 }
153 })
154 .catch(e => {
155 core_1.globalErrorHandler(e);
156 });
157 }, this._scheduledDelayMillis);
158 core_1.unrefTimer(this._timer);
159 }
160 _clearTimer() {
161 if (this._timer !== undefined) {
162 clearTimeout(this._timer);
163 this._timer = undefined;
164 }
165 }
166}
167exports.BatchSpanProcessorBase = BatchSpanProcessorBase;
168//# sourceMappingURL=BatchSpanProcessorBase.js.map
\No newline at end of file