UNPKG

874 BJavaScriptView Raw
1var assert = require('assert');
2var PubSub = require('../lib/pubsub_service');
3var ConsumerStream = require('zetta-streams').ConsumerStream;
4
5describe('ConsumerStream', function() {
6 var stream = null;
7 var pubsub = null;
8
9 beforeEach(function(){
10 pubsub = new PubSub();
11 stream = new ConsumerStream('some-topic', {objectMode: true}, pubsub);
12 });
13
14 it('it should subscribe to pubsub topic', function() {
15 stream.on('data', function(){});
16 assert.equal(pubsub._listeners['some-topic'].length, 1);
17 });
18
19 it('it pass pubsub data to stream', function(done) {
20 var received = 0;
21 stream.on('data', function(msg){
22 assert.deepEqual(msg, {date: 0, data: 1});
23 received++;
24 });
25
26 setTimeout(function(){
27 assert.equal(received, 1);
28 done();
29 },2);
30
31 pubsub.publish('some-topic', {date: 0, data: 1});
32 });
33
34});