UNPKG

2.66 kBJavaScriptView Raw
1'use strict'
2
3/**
4 * Module dependencies
5 */
6var xtend = require('xtend')
7
8var Readable = require('readable-stream').Readable
9var streamsOpts = { objectMode: true }
10var defaultStoreOptions = {
11 clean: true
12}
13
14/**
15 * es6-map can preserve insertion order even if ES version is older.
16 *
17 * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map#Description
18 * It should be noted that a Map which is a map of an object, especially
19 * a dictionary of dictionaries, will only map to the object's insertion
20 * order. In ES2015 this is ordered for objects but for older versions of
21 * ES, this may be random and not ordered.
22 *
23 */
24var Map = require('es6-map')
25
26/**
27 * In-memory implementation of the message store
28 * This can actually be saved into files.
29 *
30 * @param {Object} [options] - store options
31 */
32function Store (options) {
33 if (!(this instanceof Store)) {
34 return new Store(options)
35 }
36
37 this.options = options || {}
38
39 // Defaults
40 this.options = xtend(defaultStoreOptions, options)
41
42 this._inflights = new Map()
43}
44
45/**
46 * Adds a packet to the store, a packet is
47 * anything that has a messageId property.
48 *
49 */
50Store.prototype.put = function (packet, cb) {
51 this._inflights.set(packet.messageId, packet)
52
53 if (cb) {
54 cb()
55 }
56
57 return this
58}
59
60/**
61 * Creates a stream with all the packets in the store
62 *
63 */
64Store.prototype.createStream = function () {
65 var stream = new Readable(streamsOpts)
66 var destroyed = false
67 var values = []
68 var i = 0
69
70 this._inflights.forEach(function (value, key) {
71 values.push(value)
72 })
73
74 stream._read = function () {
75 if (!destroyed && i < values.length) {
76 this.push(values[i++])
77 } else {
78 this.push(null)
79 }
80 }
81
82 stream.destroy = function () {
83 if (destroyed) {
84 return
85 }
86
87 var self = this
88
89 destroyed = true
90
91 process.nextTick(function () {
92 self.emit('close')
93 })
94 }
95
96 return stream
97}
98
99/**
100 * deletes a packet from the store.
101 */
102Store.prototype.del = function (packet, cb) {
103 packet = this._inflights.get(packet.messageId)
104 if (packet) {
105 this._inflights.delete(packet.messageId)
106 cb(null, packet)
107 } else if (cb) {
108 cb(new Error('missing packet'))
109 }
110
111 return this
112}
113
114/**
115 * get a packet from the store.
116 */
117Store.prototype.get = function (packet, cb) {
118 packet = this._inflights.get(packet.messageId)
119 if (packet) {
120 cb(null, packet)
121 } else if (cb) {
122 cb(new Error('missing packet'))
123 }
124
125 return this
126}
127
128/**
129 * Close the store
130 */
131Store.prototype.close = function (cb) {
132 if (this.options.clean) {
133 this._inflights = null
134 }
135 if (cb) {
136 cb()
137 }
138}
139
140module.exports = Store