1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 | var defs = require('./defs');
|
20 | var constants = defs.constants;
|
21 | var decode = defs.decode;
|
22 | var encodeMethod = defs.encodeMethod;
|
23 | var encodeProperties = defs.encodeProperties;
|
24 |
|
25 | var FRAME_METHOD = constants.FRAME_METHOD,
|
26 | FRAME_HEARTBEAT = constants.FRAME_HEARTBEAT,
|
27 | FRAME_HEADER = constants.FRAME_HEADER,
|
28 | FRAME_BODY = constants.FRAME_BODY,
|
29 | FRAME_END = constants.FRAME_END;
|
30 |
|
31 | var Bits = require('bitsyntax');
|
32 | var Stream = require('stream');
|
33 | var Duplex =
|
34 | require('stream').Duplex ||
|
35 | require('readable-stream/duplex');
|
36 | var EventEmitter = require('events').EventEmitter;
|
37 | var inherits = require('util').inherits;
|
38 |
|
39 | var FRAME_OVERHEAD = 8;
|
40 |
|
41 | function wrapStream(s) {
|
42 | if (s instanceof Duplex) return s;
|
43 | else {
|
44 | var ws = new Duplex();
|
45 | ws.wrap(s);
|
46 | ws._write = function(chunk, encoding, callback) {
|
47 | return s.write(chunk, encoding, callback);
|
48 | };
|
49 | return ws;
|
50 | }
|
51 | }
|
52 |
|
53 |
|
54 |
|
55 |
|
56 | function Frames(stream) {
|
57 | EventEmitter.call(this);
|
58 | this.stream = wrapStream(stream);
|
59 |
|
60 | this.stream.once('end', this.emit.bind('end'));
|
61 | this.rest = new Buffer([]);
|
62 | this.frameMax = constants.FRAME_MIN_SIZE;
|
63 | this.sentSinceLastCheck = false;
|
64 | this.recvSinceLastCheck = false;
|
65 | }
|
66 | inherits(Frames, EventEmitter);
|
67 |
|
68 | var F = Frames.prototype;
|
69 |
|
70 | F.run = function() {
|
71 | var self = this;
|
72 |
|
73 | function go() {
|
74 | var f = self.recvFrame();
|
75 | while (f) {
|
76 | self.accept(f);
|
77 | f = self.recvFrame();
|
78 | }
|
79 | }
|
80 | this.stream.on('readable', go);
|
81 | go();
|
82 | };
|
83 |
|
84 | F.step = function() {
|
85 | var self = this;
|
86 | function recv() {
|
87 | var f = self.recvFrame();
|
88 | if (f) self.accept(f);
|
89 | else self.stream.once('readable', recv);
|
90 | }
|
91 | recv();
|
92 | };
|
93 |
|
94 | F.accept = function() {
|
95 | throw new Error("Intended to be provided by a subclass");
|
96 | };
|
97 |
|
98 |
|
99 |
|
100 | F.end = function() {
|
101 | this.stream.end();
|
102 | };
|
103 |
|
104 |
|
105 |
|
106 | F.checkSend = function() {
|
107 | var check = this.sentSinceLastCheck;
|
108 | this.sentSinceLastCheck = false;
|
109 | return check;
|
110 | }
|
111 |
|
112 | F.checkRecv = function() {
|
113 | var check = this.recvSinceLastCheck;
|
114 | this.recvSinceLastCheck = false;
|
115 | return check;
|
116 | }
|
117 |
|
118 | F.sendBytes = function(bytes) {
|
119 | this.sentSinceLastCheck = true;
|
120 | this.stream.write(bytes);
|
121 | };
|
122 |
|
123 | var HEARTBEAT_BUF = new Buffer([constants.FRAME_HEARTBEAT,
|
124 | 0, 0, 0, 0,
|
125 | 0, 0,
|
126 | constants.FRAME_END]);
|
127 |
|
128 | F.sendHeartbeat = function() {
|
129 | return this.sendBytes(HEARTBEAT_BUF);
|
130 | };
|
131 |
|
132 | F.sendMethod = function(channel, Method, fields) {
|
133 | var frame = encodeMethod(Method, channel, fields);
|
134 | this.sentSinceLastCheck = true;
|
135 | return this.stream.write(frame);
|
136 | };
|
137 |
|
138 | F.sendContent = function(channel, Properties, fields, body) {
|
139 | var writeResult = true;
|
140 | var headerFrame = encodeProperties(
|
141 | Properties, channel, body.length, fields);
|
142 |
|
143 | writeResult = this.stream.write(headerFrame);
|
144 |
|
145 | var maxBody = this.frameMax - FRAME_OVERHEAD;
|
146 | for (var offset = 0; offset < body.length; offset += maxBody) {
|
147 | var end = offset + maxBody;
|
148 | var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
|
149 | var bodyFrame = makeBodyFrame(channel, slice);
|
150 | writeResult = this.stream.write(bodyFrame);
|
151 | }
|
152 | this.sentSinceLastCheck = true;
|
153 | return writeResult;
|
154 | };
|
155 |
|
156 | var bodyCons =
|
157 | Bits.constructor(FRAME_BODY,
|
158 | 'channel:16, size:32, payload/binary',
|
159 | FRAME_END);
|
160 |
|
161 |
|
162 | function makeBodyFrame(channel, payload) {
|
163 | return bodyCons({channel: channel, size: payload.length, payload: payload});
|
164 | }
|
165 |
|
166 | var framePattern = Bits.compile('type:8, channel:16',
|
167 | 'size:32, payload:size/binary',
|
168 | FRAME_END, 'rest/binary');
|
169 | var methodPattern = Bits.compile('id:32, args/binary');
|
170 |
|
171 | F.recvFrame = function() {
|
172 |
|
173 | var frame = framePattern(this.rest);
|
174 | if (!frame) {
|
175 | var incoming = this.stream.read();
|
176 | if (incoming === null) {
|
177 | return false;
|
178 | }
|
179 | else {
|
180 | this.recvSinceLastCheck = true;
|
181 | this.rest = Buffer.concat([this.rest, incoming]);
|
182 | return this.recvFrame();
|
183 | }
|
184 | }
|
185 | else {
|
186 | this.rest = frame.rest;
|
187 | return decodeFrame(frame);
|
188 | }
|
189 | };
|
190 |
|
191 | var HEARTBEAT = {channel: 0};
|
192 |
|
193 |
|
194 | var headerPattern = Bits.compile('class:16',
|
195 | '_weight:16',
|
196 | 'size:64',
|
197 | 'flagsAndfields/binary');
|
198 |
|
199 | function decodeFrame(frame) {
|
200 | var payload = frame.payload;
|
201 | switch (frame.type) {
|
202 | case FRAME_METHOD:
|
203 | var idAndArgs = methodPattern(payload);
|
204 | var id = idAndArgs.id;
|
205 | var fields = decode(id, idAndArgs.args);
|
206 | return {id: id, channel: frame.channel, fields: fields};
|
207 | case FRAME_HEADER:
|
208 | var parts = headerPattern(payload);
|
209 | var id = parts['class'];
|
210 | var fields = decode(id, parts.flagsAndfields);
|
211 | return {id: id, channel: frame.channel,
|
212 | size: parts.size, fields: fields};
|
213 | case FRAME_BODY:
|
214 | return {channel: frame.channel, content: frame.payload};
|
215 | case FRAME_HEARTBEAT:
|
216 | return HEARTBEAT;
|
217 | default:
|
218 | throw new Error('Unknown frame type ' + frame.type);
|
219 | }
|
220 | }
|
221 |
|
222 | module.exports = Frames;
|
223 | module.exports.HEARTBEAT = HEARTBEAT;
|
224 | module.exports.HEARTBEAT_BUF = HEARTBEAT_BUF;
|