All files / confluent-kafka-javascript/lib/kafkajs _consumer_cache.js

77.41% Statements 72/93
56.75% Branches 21/37
66.66% Functions 12/18
77.17% Lines 71/92

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286      1x 1x               22x   22x   22x     22x                           223x       21x                           44x 44x   44x 44x 223x   44x                                       1x     1x 1x             3x 3x 3x 3x                 92x 92x 70x   22x 22x 22x                 21x 21x 21x 21x                   223x 223x 223x 22x 22x 22x 22x   223x               89x                             2x 4x 4x 4x 4x                                       39x 223x 39x                     2x 1x 1x 1x 1x 1x 1x                                                                           114x 92x 114x 70x   44x   44x 21x 21x     23x 23x               2x 1x   2x             99x 99x                     1x  
const {
    partitionKey,
    DeferredPromise,
} = require('./_common');
const { LinkedList } = require('./_linked-list');
 
/**
 * A PerPartitionMessageCache is a cache for messages for a single partition.
 * @private
 */
class PerPartitionMessageCache {
    /* The cache is a list of messages. */
    #cache = new LinkedList();
    /* The key for the partition. */
    #key = null;
    /* Whether the cache is assigned to a consumer. */
    _assigned = false;
 
    constructor(key) {
        this.#key = key;
    }
 
    /**
     * Returns the number of total elements in the cache.
     */
    size() {
        return this.#cache.length;
    }
 
    /**
     * Adds a message to the cache.
     */
    _add(message) {
        this.#cache.addLast(message);
    }
 
    get key() {
        return this.#key;
    }
 
    /**
     * @returns The next element in the cache or null if none exists.
     */
    _next() {
        return this.#cache.removeFirst();
    }
 
    /**
     * @returns Upto `n` next elements in the cache or an empty array if none exists.
     */
    _nextN(n) {
        const len = this.#cache.length;
        n = (n < 0 || len < n) ? len : n;
 
        const ret = new Array(n);
        for (let i = 0; i < n; i++) {
            ret[i] = this.#cache.removeFirst();
        }
        return ret;
    }
}
 
 
/**
 * MessageCache defines a dynamically sized cache for messages.
 * Internally, it uses PerPartitionMessageCache to store messages for each partition.
 * @private
 */
class MessageCache {
    #size;
    /* Map of topic+partition to PerPartitionMessageCache. */
    #tpToPpc;
    /* LinkedList of available partitions. */
    #availablePartitions;
    /* LinkedList of assigned partitions. */
    #assignedPartitions;
 
    /* Promise that is resolved when there are available partitions. */
    #availablePartitionsPromise = new DeferredPromise();
 
    constructor(logger) {
        this.logger = logger ?? console;
        this.#reinit();
    }
 
    /**
     * Reinitializes the cache.
     */
    #reinit() {
      this.#tpToPpc = new Map();
      this.#availablePartitions = new LinkedList();
      this.#assignedPartitions = new LinkedList();
      this.#size = 0;
    }
 
    /**
     * Assign a new partition to the consumer, if available.
     *
     * @returns {PerPartitionMessageCache} - the partition assigned to the consumer, or null if none available.
     */
    #assignNewPartition() {
        let ppc = this.#availablePartitions.removeFirst();
        if (!ppc)
            return null;
 
        ppc._node = this.#assignedPartitions.addLast(ppc);
        ppc._assigned = true;
        return ppc;
    }
 
    /**
     * Remove an empty partition from the cache.
     *
     * @param {PerPartitionMessageCache} ppc The partition to remove from the cache.
     */
    #removeEmptyPartition(ppc) {
        this.#assignedPartitions.remove(ppc._node);
        ppc._assigned = false;
        ppc._node = null;
        this.#tpToPpc.delete(ppc.key);
    }
 
    /**
     * Add a single message to a PPC.
     * In case the PPC does not exist, it is created.
     *
     * @param {Object} message - the message to add to the cache.
     */
    #add(message) {
        const key = partitionKey(message);
        let cache = this.#tpToPpc.get(key);
        if (!cache) {
            cache = new PerPartitionMessageCache(key);
            this.#tpToPpc.set(key, cache);
            cache._node = this.#availablePartitions.addLast(cache);
            this.notifyAvailablePartitions();
        }
        cache._add(message);
    }
 
    get availableSize() {
        return this.#availablePartitions.length;
    }
 
    get assignedSize() {
        return this.#assignedPartitions.length;
    }
 
    get size() {
        return this.#size;
    }
 
    /**
     * Mark a set of topic partitions 'stale'.
     *
     * Post-conditions: PPCs are removed from their currently assigned list
     * and deleted from the PPC map. Cache size is decremented accordingly.
     * PPCs are marked as not assigned.
     */
    markStale(topicPartitions) {
        for (const topicPartition of topicPartitions) {
            const key = partitionKey(topicPartition);
            const ppc = this.#tpToPpc.get(key);
            Eif (!ppc)
                continue;
 
            this.#size -= ppc.size();
            if (ppc._assigned) {
                this.#assignedPartitions.remove(ppc._node);
            } else {
                this.#availablePartitions.remove(ppc._node);
            }
            this.#tpToPpc.delete(key);
            ppc._assigned = false;
        }
    }
 
    /**
     * Adds many messages into the cache, partitioning them as per their toppar.
     * Increases cache size by the number of messages added.
     *
     * @param {Array} messages - the messages to add to the cache.
     */
    addMessages(messages) {
        for (const message of messages)
            this.#add(message);
        this.#size += messages.length;
    }
 
    /**
     * Allows returning the PPC without asking for another message.
     *
     * @param {PerPartitionMessageCache} ppc - the partition to return.
     *
     * @note this is a no-op if the PPC is not assigned.
     */
    return(ppc) {
        if (!ppc._assigned)
            return;
        Eif (ppc._node) {
            this.#assignedPartitions.remove(ppc._node);
            ppc._node = this.#availablePartitions.addLast(ppc);
            ppc._assigned = false;
            this.notifyAvailablePartitions();
        }
    }
 
    /**
     * Returns the next element in the cache, or null if none exists.
     *
     * If the current PPC is exhausted, it moves to the next PPC.
     * If all PPCs are exhausted, it returns null.
     *
     * @param {PerPartitionMessageCache} ppc - after a consumer has consumed a message, it must return the PPC back to us via this parameter.
     *                       otherwise, no messages from that topic partition will be consumed.
     * @returns {Array} - the next message in the cache, or null if none exists, and the corresponding PPC.
     * @note Whenever making changes to this function, ensure that you benchmark perf.
     */
    next(ppc = null) {
        if (!ppc|| !ppc._assigned)
            ppc = this.#assignNewPartition();
        if (!ppc)
            return null;
 
        let next = ppc._next();
 
        if (!next) {
            this.#removeEmptyPartition(ppc);
            return this.next();
        }
 
        this.#size--;
        return [next, ppc];
    }
 
    /**
     * Returns the next `size` elements in the cache as an array, or null if none exists.
     *
     * @sa next, the behaviour is similar in other aspects.
     */
    nextN(ppc = null, size = -1) {
        if (!ppc || !ppc._assigned)
            ppc = this.#assignNewPartition();
        if (!ppc)
            return null;
 
        let nextN = ppc._nextN(size);
 
        if (!nextN.length) {
            this.#removeEmptyPartition(ppc);
            return this.nextN(null, size);
        }
 
        this.#size -= nextN.length;
        return [nextN, ppc];
    }
 
    /**
     * Clears the cache completely.
     * This resets it to a base state.
     */
    clear() {
        for (const ppc of this.#tpToPpc.values()) {
            ppc._assigned = false;
        }
        this.#reinit();
    }
 
    /**
     * Notifies awaiters that there are available partitions to take.
     */
    notifyAvailablePartitions() {
        this.#availablePartitionsPromise.resolve();
        this.#availablePartitionsPromise = new DeferredPromise();
    }
 
    /**
     * Promise that resolved when there are available partitions to take.
     */
    async availablePartitions() {
        return this.#availablePartitionsPromise;
    }
}
 
module.exports = MessageCache;