UNPKG

1.83 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.KafkaParser = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5class KafkaParser {
6 constructor(config) {
7 this.keepBinary = (config && config.keepBinary) || false;
8 }
9 parse(data) {
10 // Clone object to as modifying the original one would break KafkaJS retries
11 const result = {
12 ...data,
13 headers: { ...data.headers },
14 };
15 if (!this.keepBinary) {
16 result.value = this.decode(data.value);
17 }
18 if (!(0, shared_utils_1.isNil)(data.key)) {
19 result.key = this.decode(data.key);
20 }
21 if (!(0, shared_utils_1.isNil)(data.headers)) {
22 const decodeHeaderByKey = (key) => {
23 result.headers[key] = this.decode(data.headers[key]);
24 };
25 Object.keys(data.headers).forEach(decodeHeaderByKey);
26 }
27 else {
28 result.headers = {};
29 }
30 return result;
31 }
32 decode(value) {
33 if ((0, shared_utils_1.isNil)(value)) {
34 return null;
35 }
36 // A value with the "leading zero byte" indicates the schema payload.
37 // The "content" is possibly binary and should not be touched & parsed.
38 if (Buffer.isBuffer(value) &&
39 value.length > 0 &&
40 value.readUInt8(0) === 0) {
41 return value;
42 }
43 let result = value.toString();
44 const startChar = result.charAt(0);
45 // only try to parse objects and arrays
46 if (startChar === '{' || startChar === '[') {
47 try {
48 result = JSON.parse(value.toString());
49 }
50 catch (e) { }
51 }
52 return result;
53 }
54}
55exports.KafkaParser = KafkaParser;