UNPKG

1.21 kBJavaScriptView Raw
1'use strict';
2
3const Base = require('sdk-base');
4const bsInsert = require('binary-search-insert');
5const comparator = (a, b) => a.queueOffset - b.queueOffset;
6
7const pullMaxIdleTime = 120000;
8
9class ProcessQueue extends Base {
10 constructor(options = {}) {
11 super(options);
12
13 this.msgList = [];
14 this.droped = false;
15 this.lastPullTimestamp = Date.now();
16 this.lastConsumeTimestamp = Date.now();
17
18 this.locked = false;
19 this.lastLockTimestamp = Date.now();
20 }
21
22 get maxSpan() {
23 const msgCount = this.msgCount;
24 if (msgCount) {
25 return this.msgList[msgCount - 1].queueOffset - this.msgList[0].queueOffset;
26 }
27 return 0;
28 }
29
30 get msgCount() {
31 return this.msgList.length;
32 }
33
34 get isPullExpired() {
35 return Date.now() - this.lastPullTimestamp > pullMaxIdleTime;
36 }
37
38 putMessage(msgs) {
39 for (const msg of msgs) {
40 bsInsert(this.msgList, comparator, msg);
41 }
42 this.queueOffsetMax = this.msgList[this.msgCount - 1].queueOffset;
43 }
44
45 remove(count = 1) {
46 this.msgList.splice(0, count);
47 return this.msgCount ? this.msgList[0].queueOffset : this.queueOffsetMax + 1;
48 }
49
50 clear() {
51 this.msgList = [];
52 }
53}
54
55module.exports = ProcessQueue;