UNPKG

6.64 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.KafkaReplyPartitionAssigner = void 0;
4const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
5const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
6let kafkaPackage = {};
7class KafkaReplyPartitionAssigner {
8 constructor(clientKafka, config) {
9 this.clientKafka = clientKafka;
10 this.config = config;
11 this.name = 'NestReplyPartitionAssigner';
12 this.version = 1;
13 kafkaPackage = (0, load_package_util_1.loadPackage)('kafkajs', KafkaReplyPartitionAssigner.name, () => require('kafkajs'));
14 }
15 /**
16 * This process can result in imbalanced assignments
17 * @param {array} members array of members, e.g: [{ memberId: 'test-5f93f5a3' }]
18 * @param {array} topics
19 * @param {Buffer} userData
20 * @returns {array} object partitions per topic per member
21 */
22 async assign(group) {
23 const assignment = {};
24 const previousAssignment = {};
25 const membersCount = group.members.length;
26 const decodedMembers = group.members.map(member => this.decodeMember(member));
27 const sortedMemberIds = decodedMembers
28 .map(member => member.memberId)
29 .sort();
30 // build the previous assignment and an inverse map of topic > partition > memberId for lookup
31 decodedMembers.forEach(member => {
32 if (!previousAssignment[member.memberId] &&
33 Object.keys(member.previousAssignment).length > 0) {
34 previousAssignment[member.memberId] = member.previousAssignment;
35 }
36 });
37 // build a collection of topics and partitions
38 const topicsPartitions = group.topics
39 .map(topic => {
40 const partitionMetadata = this.config.cluster.findTopicPartitionMetadata(topic);
41 return partitionMetadata.map(m => {
42 return {
43 topic,
44 partitionId: m.partitionId,
45 };
46 });
47 })
48 .reduce((acc, val) => acc.concat(val), []);
49 // create the new assignment by populating the members with the first partition of the topics
50 sortedMemberIds.forEach(assignee => {
51 if (!assignment[assignee]) {
52 assignment[assignee] = {};
53 }
54 // add topics to each member
55 group.topics.forEach(topic => {
56 if (!assignment[assignee][topic]) {
57 assignment[assignee][topic] = [];
58 }
59 // see if the topic and partition belong to a previous assignment
60 if (previousAssignment[assignee] &&
61 !(0, shared_utils_1.isUndefined)(previousAssignment[assignee][topic])) {
62 // take the minimum partition since replies will be sent to the minimum partition
63 const firstPartition = previousAssignment[assignee][topic];
64 // create the assignment with the first partition
65 assignment[assignee][topic].push(firstPartition);
66 // find and remove this topic and partition from the topicPartitions to be assigned later
67 const topicsPartitionsIndex = topicsPartitions.findIndex(topicPartition => {
68 return (topicPartition.topic === topic &&
69 topicPartition.partitionId === firstPartition);
70 });
71 // only continue if we found a partition matching this topic
72 if (topicsPartitionsIndex !== -1) {
73 // remove inline
74 topicsPartitions.splice(topicsPartitionsIndex, 1);
75 }
76 }
77 });
78 });
79 // check for member topics that have a partition length of 0
80 sortedMemberIds.forEach(assignee => {
81 group.topics.forEach(topic => {
82 // only continue if there are no partitions for assignee's topic
83 if (assignment[assignee][topic].length === 0) {
84 // find the first partition for this topic
85 const topicsPartitionsIndex = topicsPartitions.findIndex(topicPartition => {
86 return topicPartition.topic === topic;
87 });
88 if (topicsPartitionsIndex !== -1) {
89 // find and set the topic partition
90 const partition = topicsPartitions[topicsPartitionsIndex].partitionId;
91 assignment[assignee][topic].push(partition);
92 // remove this partition from the topics partitions collection
93 topicsPartitions.splice(topicsPartitionsIndex, 1);
94 }
95 }
96 });
97 });
98 // then balance out the rest of the topic partitions across the members
99 const insertAssignmentsByTopic = (topicPartition, i) => {
100 const assignee = sortedMemberIds[i % membersCount];
101 assignment[assignee][topicPartition.topic].push(topicPartition.partitionId);
102 };
103 // build the assignments
104 topicsPartitions.forEach(insertAssignmentsByTopic);
105 // encode the end result
106 return Object.keys(assignment).map(memberId => ({
107 memberId,
108 memberAssignment: kafkaPackage.AssignerProtocol.MemberAssignment.encode({
109 version: this.version,
110 assignment: assignment[memberId],
111 }),
112 }));
113 }
114 protocol(subscription) {
115 const stringifiedUserData = JSON.stringify({
116 previousAssignment: this.getPreviousAssignment(),
117 });
118 subscription.userData = Buffer.from(stringifiedUserData);
119 return {
120 name: this.name,
121 metadata: kafkaPackage.AssignerProtocol.MemberMetadata.encode({
122 version: this.version,
123 topics: subscription.topics,
124 userData: subscription.userData,
125 }),
126 };
127 }
128 getPreviousAssignment() {
129 return this.clientKafka.getConsumerAssignments();
130 }
131 decodeMember(member) {
132 const memberMetadata = kafkaPackage.AssignerProtocol.MemberMetadata.decode(member.memberMetadata);
133 const memberUserData = JSON.parse(memberMetadata.userData.toString());
134 return {
135 memberId: member.memberId,
136 previousAssignment: memberUserData.previousAssignment,
137 };
138 }
139}
140exports.KafkaReplyPartitionAssigner = KafkaReplyPartitionAssigner;