1 | var Frames = require('../lib/frame');
|
2 | var PassThrough =
|
3 | require('stream').PassThrough ||
|
4 | require('readable-stream/passthrough');
|
5 | var defer = require('when').defer;
|
6 | var defs = require('../lib/defs');
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 | function socketPair() {
|
27 | var server = new PassThrough();
|
28 | var client = new PassThrough();
|
29 | server.write = client.push.bind(client);
|
30 | client.write = server.push.bind(server);
|
31 | function end(chunk, encoding) {
|
32 | if (chunk) this.push(chunk, encoding);
|
33 | this.push(null);
|
34 | }
|
35 | server.end = end.bind(client);
|
36 | client.end = end.bind(server);
|
37 |
|
38 | return {client: client, server: server};
|
39 | }
|
40 |
|
41 | function runServer(socket, run) {
|
42 | var frames = new Frames(socket);
|
43 |
|
44 | function send(id, fields, channel, content) {
|
45 | channel = channel || 0;
|
46 | if (!id && content) {
|
47 | frames.sendContent(channel, defs.BasicProperties, fields, content);
|
48 | }
|
49 | else {
|
50 | frames.sendMethod(channel, id, fields);
|
51 | }
|
52 | }
|
53 |
|
54 | function await(method) {
|
55 | return function() {
|
56 | var d = defer();
|
57 | if (method) {
|
58 | frames.accept = function(f) {
|
59 | if (f.id === method)
|
60 | d.resolve(f);
|
61 | else
|
62 | d.reject(new Error("Expected method: " + method +
|
63 | ", got " + f.id));
|
64 | };
|
65 | }
|
66 | else {
|
67 | frames.accept = d.resolve.bind(d);
|
68 | }
|
69 | frames.step();
|
70 | return d.promise;
|
71 | };
|
72 | }
|
73 | run(send, await);
|
74 | return frames;
|
75 | }
|
76 |
|
77 |
|
78 | function succeed(done) {
|
79 | return function() { done(); }
|
80 | }
|
81 |
|
82 |
|
83 |
|
84 |
|
85 | function fail(done) {
|
86 | return function(err) {
|
87 | if (err instanceof Error) done(err);
|
88 | else done(new Error("Expected to fail, instead got " + err.toString()));
|
89 | }
|
90 | }
|
91 |
|
92 |
|
93 |
|
94 |
|
95 | function latch(count, done) {
|
96 | var awaiting = count;
|
97 | var alive = true;
|
98 | return function(err) {
|
99 | if (err instanceof Error && alive) {
|
100 | alive = false;
|
101 | done(err);
|
102 | }
|
103 | else {
|
104 | awaiting--;
|
105 | if (awaiting === 0 && alive) {
|
106 | alive = false;
|
107 | done();
|
108 | }
|
109 | }
|
110 | };
|
111 | }
|
112 |
|
113 | module.exports = {
|
114 | socketPair: socketPair,
|
115 | runServer: runServer,
|
116 | succeed: succeed,
|
117 | fail: fail,
|
118 | latch: latch
|
119 | };
|