UNPKG

1.88 kBJavaScriptView Raw
1'use strict';
2var Readable = require('readable-stream').Readable,
3 streamsOpts = { objectMode: true };
4
5/**
6 * In-memory implementation of the message store
7 * This can actually be saved into files.
8 *
9 */
10function Store () {
11 if (!(this instanceof Store)) {
12 return new Store();
13 }
14
15 this._inflights = {};
16}
17
18/**
19 * Adds a packet to the store, a packet is
20 * anything that has a messageId property.
21 *
22 */
23Store.prototype.put = function (packet, cb) {
24 this._inflights[packet.messageId] = packet;
25
26 if (cb) {
27 cb();
28 }
29
30 return this;
31};
32
33/**
34 * Creates a stream with all the packets in the store
35 *
36 */
37Store.prototype.createStream = function () {
38 var stream = new Readable(streamsOpts),
39 inflights = this._inflights,
40 ids = Object.keys(this._inflights),
41 destroyed = false,
42 i = 0;
43
44 stream._read = function () {
45 if (!destroyed && i < ids.length) {
46 this.push(inflights[ids[i++]]);
47 } else {
48 this.push(null);
49 }
50 };
51
52 stream.destroy = function () {
53 if (destroyed) {
54 return;
55 }
56
57 var self = this;
58
59 destroyed = true;
60
61 process.nextTick(function () {
62 self.emit('close');
63 });
64 };
65
66 return stream;
67};
68
69/**
70 * deletes a packet from the store.
71 */
72Store.prototype.del = function (packet, cb) {
73 packet = this._inflights[packet.messageId];
74 if (packet) {
75 delete this._inflights[packet.messageId];
76 cb(null, packet);
77 } else if (cb) {
78 cb(new Error('missing packet'));
79 }
80
81 return this;
82};
83
84/**
85 * get a packet from the store.
86 */
87Store.prototype.get = function (packet, cb) {
88 packet = this._inflights[packet.messageId];
89 if (packet) {
90 cb(null, packet);
91 } else if (cb) {
92 cb(new Error('missing packet'));
93 }
94
95 return this;
96};
97
98/**
99 * Close the store
100 */
101Store.prototype.close = function (cb) {
102 this._inflights = null;
103 if (cb) {
104 cb();
105 }
106};
107
108module.exports = Store;