1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.KafkaReplyPartitionAssigner = void 0;
|
4 | const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
|
5 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
6 | let kafkaPackage = {};
|
7 | class KafkaReplyPartitionAssigner {
|
8 | constructor(clientKafka, config) {
|
9 | this.clientKafka = clientKafka;
|
10 | this.config = config;
|
11 | this.name = 'NestReplyPartitionAssigner';
|
12 | this.version = 1;
|
13 | kafkaPackage = (0, load_package_util_1.loadPackage)('kafkajs', KafkaReplyPartitionAssigner.name, () => require('kafkajs'));
|
14 | }
|
15 | |
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 | async assign(group) {
|
23 | const assignment = {};
|
24 | const previousAssignment = {};
|
25 | const membersCount = group.members.length;
|
26 | const decodedMembers = group.members.map(member => this.decodeMember(member));
|
27 | const sortedMemberIds = decodedMembers
|
28 | .map(member => member.memberId)
|
29 | .sort();
|
30 |
|
31 | decodedMembers.forEach(member => {
|
32 | if (!previousAssignment[member.memberId] &&
|
33 | Object.keys(member.previousAssignment).length > 0) {
|
34 | previousAssignment[member.memberId] = member.previousAssignment;
|
35 | }
|
36 | });
|
37 |
|
38 | const topicsPartitions = group.topics
|
39 | .map(topic => {
|
40 | const partitionMetadata = this.config.cluster.findTopicPartitionMetadata(topic);
|
41 | return partitionMetadata.map(m => {
|
42 | return {
|
43 | topic,
|
44 | partitionId: m.partitionId,
|
45 | };
|
46 | });
|
47 | })
|
48 | .reduce((acc, val) => acc.concat(val), []);
|
49 |
|
50 | sortedMemberIds.forEach(assignee => {
|
51 | if (!assignment[assignee]) {
|
52 | assignment[assignee] = {};
|
53 | }
|
54 |
|
55 | group.topics.forEach(topic => {
|
56 | if (!assignment[assignee][topic]) {
|
57 | assignment[assignee][topic] = [];
|
58 | }
|
59 |
|
60 | if (previousAssignment[assignee] &&
|
61 | !(0, shared_utils_1.isUndefined)(previousAssignment[assignee][topic])) {
|
62 |
|
63 | const firstPartition = previousAssignment[assignee][topic];
|
64 |
|
65 | assignment[assignee][topic].push(firstPartition);
|
66 |
|
67 | const topicsPartitionsIndex = topicsPartitions.findIndex(topicPartition => {
|
68 | return (topicPartition.topic === topic &&
|
69 | topicPartition.partitionId === firstPartition);
|
70 | });
|
71 |
|
72 | if (topicsPartitionsIndex !== -1) {
|
73 |
|
74 | topicsPartitions.splice(topicsPartitionsIndex, 1);
|
75 | }
|
76 | }
|
77 | });
|
78 | });
|
79 |
|
80 | sortedMemberIds.forEach(assignee => {
|
81 | group.topics.forEach(topic => {
|
82 |
|
83 | if (assignment[assignee][topic].length === 0) {
|
84 |
|
85 | const topicsPartitionsIndex = topicsPartitions.findIndex(topicPartition => {
|
86 | return topicPartition.topic === topic;
|
87 | });
|
88 | if (topicsPartitionsIndex !== -1) {
|
89 |
|
90 | const partition = topicsPartitions[topicsPartitionsIndex].partitionId;
|
91 | assignment[assignee][topic].push(partition);
|
92 |
|
93 | topicsPartitions.splice(topicsPartitionsIndex, 1);
|
94 | }
|
95 | }
|
96 | });
|
97 | });
|
98 |
|
99 | const insertAssignmentsByTopic = (topicPartition, i) => {
|
100 | const assignee = sortedMemberIds[i % membersCount];
|
101 | assignment[assignee][topicPartition.topic].push(topicPartition.partitionId);
|
102 | };
|
103 |
|
104 | topicsPartitions.forEach(insertAssignmentsByTopic);
|
105 |
|
106 | return Object.keys(assignment).map(memberId => ({
|
107 | memberId,
|
108 | memberAssignment: kafkaPackage.AssignerProtocol.MemberAssignment.encode({
|
109 | version: this.version,
|
110 | assignment: assignment[memberId],
|
111 | }),
|
112 | }));
|
113 | }
|
114 | protocol(subscription) {
|
115 | const stringifiedUserData = JSON.stringify({
|
116 | previousAssignment: this.getPreviousAssignment(),
|
117 | });
|
118 | subscription.userData = Buffer.from(stringifiedUserData);
|
119 | return {
|
120 | name: this.name,
|
121 | metadata: kafkaPackage.AssignerProtocol.MemberMetadata.encode({
|
122 | version: this.version,
|
123 | topics: subscription.topics,
|
124 | userData: subscription.userData,
|
125 | }),
|
126 | };
|
127 | }
|
128 | getPreviousAssignment() {
|
129 | return this.clientKafka.getConsumerAssignments();
|
130 | }
|
131 | decodeMember(member) {
|
132 | const memberMetadata = kafkaPackage.AssignerProtocol.MemberMetadata.decode(member.memberMetadata);
|
133 | const memberUserData = JSON.parse(memberMetadata.userData.toString());
|
134 | return {
|
135 | memberId: member.memberId,
|
136 | previousAssignment: memberUserData.previousAssignment,
|
137 | };
|
138 | }
|
139 | }
|
140 | exports.KafkaReplyPartitionAssigner = KafkaReplyPartitionAssigner;
|