UNPKG

6.19 kBJavaScriptView Raw
1// The river sweeps through
2// Silt and twigs, gravel and leaves
3// Driving the wheel on
4
5/*
6 Frame format:
7
8 0 1 3 7 size+7 size+8
9 +------+---------+-------------+ +------------+ +-----------+
10 | type | channel | size | | payload | | frame-end |
11 +------+---------+-------------+ +------------+ +-----------+
12 octet short long size octets octet
13
14 In general I want to know those first three things straight away, so I
15 can discard frames early.
16
17*/
18
19var defs = require('./defs');
20var constants = defs.constants;
21var decode = defs.decode;
22var encodeMethod = defs.encodeMethod;
23var encodeProperties = defs.encodeProperties;
24
25var FRAME_METHOD = constants.FRAME_METHOD,
26FRAME_HEARTBEAT = constants.FRAME_HEARTBEAT,
27FRAME_HEADER = constants.FRAME_HEADER,
28FRAME_BODY = constants.FRAME_BODY,
29FRAME_END = constants.FRAME_END;
30
31var Bits = require('bitsyntax');
32var Stream = require('stream');
33var Duplex =
34 require('stream').Duplex ||
35 require('readable-stream/duplex');
36var EventEmitter = require('events').EventEmitter;
37var inherits = require('util').inherits;
38
39var FRAME_OVERHEAD = 8;
40
41function wrapStream(s) {
42 if (s instanceof Duplex) return s;
43 else {
44 var ws = new Duplex();
45 ws.wrap(s); //wraps the readable side of things
46 ws._write = function(chunk, encoding, callback) {
47 return s.write(chunk, encoding, callback);
48 };
49 return ws;
50 }
51}
52
53/*
54 Sending and receiving frames, given a duplex byte stream
55*/
56function Frames(stream) {
57 EventEmitter.call(this);
58 this.stream = wrapStream(stream);
59 // %% not sure of the utility of forwarding this event
60 this.stream.once('end', this.emit.bind('end'));
61 this.rest = new Buffer([]);
62 this.frameMax = constants.FRAME_MIN_SIZE;
63 this.sentSinceLastCheck = false;
64 this.recvSinceLastCheck = false;
65}
66inherits(Frames, EventEmitter);
67
68var F = Frames.prototype;
69
70F.run = function() {
71 var self = this;
72
73 function go() {
74 var f = self.recvFrame();
75 while (f) {
76 self.accept(f);
77 f = self.recvFrame();
78 }
79 }
80 this.stream.on('readable', go);
81 go();
82};
83
84F.step = function() {
85 var self = this;
86 function recv() {
87 var f = self.recvFrame();
88 if (f) self.accept(f);
89 else self.stream.once('readable', recv);
90 }
91 recv();
92};
93
94F.accept = function() {
95 throw new Error("Intended to be provided by a subclass");
96};
97
98// Call to signal that no more work will be done on this frame, erm,
99// stream.
100F.end = function() {
101 this.stream.end();
102};
103
104// low-level API
105
106F.checkSend = function() {
107 var check = this.sentSinceLastCheck;
108 this.sentSinceLastCheck = false;
109 return check;
110}
111
112F.checkRecv = function() {
113 var check = this.recvSinceLastCheck;
114 this.recvSinceLastCheck = false;
115 return check;
116}
117
118F.sendBytes = function(bytes) {
119 this.sentSinceLastCheck = true;
120 this.stream.write(bytes);
121};
122
123var HEARTBEAT_BUF = new Buffer([constants.FRAME_HEARTBEAT,
124 0, 0, 0, 0, // size = 0
125 0, 0, // channel = 0
126 constants.FRAME_END]);
127
128F.sendHeartbeat = function() {
129 return this.sendBytes(HEARTBEAT_BUF);
130};
131
132F.sendMethod = function(channel, Method, fields) {
133 var frame = encodeMethod(Method, channel, fields);
134 this.sentSinceLastCheck = true;
135 return this.stream.write(frame);
136};
137
138F.sendContent = function(channel, Properties, fields, body) {
139 var writeResult = true;
140 var headerFrame = encodeProperties(
141 Properties, channel, body.length, fields);
142 // I'll send the headers regardless
143 writeResult = this.stream.write(headerFrame);
144
145 var maxBody = this.frameMax - FRAME_OVERHEAD;
146 for (var offset = 0; offset < body.length; offset += maxBody) {
147 var end = offset + maxBody;
148 var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
149 var bodyFrame = makeBodyFrame(channel, slice);
150 writeResult = this.stream.write(bodyFrame);
151 }
152 this.sentSinceLastCheck = true;
153 return writeResult;
154};
155
156var bodyCons =
157 Bits.constructor(FRAME_BODY,
158 'channel:16, size:32, payload/binary',
159 FRAME_END);
160// %%% TESTME possibly better to cons the first bit and write the
161// second directly, in the absence of IO lists
162function makeBodyFrame(channel, payload) {
163 return bodyCons({channel: channel, size: payload.length, payload: payload});
164}
165
166var framePattern = Bits.compile('type:8, channel:16',
167 'size:32, payload:size/binary',
168 FRAME_END, 'rest/binary');
169var methodPattern = Bits.compile('id:32, args/binary');
170
171F.recvFrame = function() {
172 // %%% identifying invariants might help here?
173 var frame = framePattern(this.rest);
174 if (!frame) {
175 var incoming = this.stream.read();
176 if (incoming === null) {
177 return false;
178 }
179 else {
180 this.recvSinceLastCheck = true;
181 this.rest = Buffer.concat([this.rest, incoming]);
182 return this.recvFrame();
183 }
184 }
185 else {
186 this.rest = frame.rest;
187 return decodeFrame(frame);
188 }
189};
190
191var HEARTBEAT = {channel: 0}; // channel to make sure it gets
192 // dispatched properly
193
194var headerPattern = Bits.compile('class:16',
195 '_weight:16',
196 'size:64',
197 'flagsAndfields/binary');
198
199function decodeFrame(frame) {
200 var payload = frame.payload;
201 switch (frame.type) {
202 case FRAME_METHOD:
203 var idAndArgs = methodPattern(payload);
204 var id = idAndArgs.id;
205 var fields = decode(id, idAndArgs.args);
206 return {id: id, channel: frame.channel, fields: fields};
207 case FRAME_HEADER:
208 var parts = headerPattern(payload);
209 var id = parts['class'];
210 var fields = decode(id, parts.flagsAndfields);
211 return {id: id, channel: frame.channel,
212 size: parts.size, fields: fields};
213 case FRAME_BODY:
214 return {channel: frame.channel, content: frame.payload};
215 case FRAME_HEARTBEAT:
216 return HEARTBEAT;
217 default:
218 throw new Error('Unknown frame type ' + frame.type);
219 }
220}
221
222module.exports = Frames;
223module.exports.HEARTBEAT = HEARTBEAT;
224module.exports.HEARTBEAT_BUF = HEARTBEAT_BUF;