UNPKG

4.68 kBJavaScriptView Raw
1var url = require('url');
2var util = require('util');
3var ReadableStream = require('stream').Readable;
4var EventEmitter = require('events').EventEmitter;
5var rels = require('zetta-rels');
6
7var VirtualStream = module.exports = function(topic, socket, options) {
8 ReadableStream.call(this, options);
9 this._topic = topic;
10 this._socket = socket;
11 this.listener = null;
12};
13util.inherits(VirtualStream, ReadableStream);
14
15VirtualStream.prototype._read = function(size) {
16 var self = this;
17
18 if(!this.listener) {
19 this.listener = function(data) {
20 if(!self.push(data)) {
21 self._socket.unsubscribe(self._topic, self.listener);
22 self.listener = null;
23 }
24 };
25 this._socket.subscribe(this._topic);
26 this._socket.on(this._topic, this.listener);
27 }
28};
29
30var VirtualDevice = module.exports = function(entity, peerSocket) {
31 var self = this;
32 this._socket = peerSocket;
33 this._update(entity);
34
35 this._eventEmitter = new EventEmitter();
36 this.on = this._eventEmitter.on.bind(this._eventEmitter);
37
38 var logTopic = this._getTopic(this._getLinkWithTitle('logs'));
39 this._socket.subscribe(logTopic, function() {
40 self._eventEmitter.emit('ready');
41 });
42
43 this._socket.on(logTopic, function(data) {
44 self._update(data);
45 self._eventEmitter.emit(data.transition);
46 });
47
48
49 // setup streams
50 this.streams = {};
51
52 // add all object-stream
53 this._getLinksWithRel(rels.objectStream).forEach(function(monitor) {
54 var topic = self._getTopic(monitor);
55 // subscribe to topic
56 self._socket.subscribe(topic);
57 if(!self.streams[monitor.title]) {
58 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: true });
59 }
60 self._socket.on(topic, function(data) {
61 self[monitor.title] = data.data;
62 });
63 });
64
65 // add all binary-stream
66 this._getLinksWithRel(rels.binaryStream).forEach(function(monitor) {
67 var topic = self._getTopic(monitor);
68 if(!self.streams[monitor.title]) {
69 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: false });
70 }
71 self._socket.on(topic, function(data) {
72 self[monitor.title] = data.data;
73 });
74 });
75
76};
77
78VirtualDevice.prototype.createReadStream = function(name) {
79 var link = this._getLinkWithTitle(name);
80 return new VirtualStream(this._getTopic(link), this._socket, { objectMode: (link.rel.indexOf(rels.objectStream) > -1) });
81};
82
83VirtualDevice.prototype.call = function(/* transition, args, cb */) {
84 var self = this;
85 var args = Array.prototype.slice.call(arguments);
86 var transition = args[0];
87
88 var cb, transitionArgs;
89 if(typeof args[args.length - 1] === 'function') {
90 cb = args[args.length - 1];
91 transitionArgs = args.slice(1, args.length - 1);
92 } else {
93 transitionArgs = args.slice(1, args.length);
94 cb = function(err) {
95 throw err;
96 };
97 }
98
99 var action = this._getAction(transition);
100 if(!action) {
101 cb(new Error('Transition not available'));
102 return;
103 }
104
105 var actionArguments = this._encodeData(action, transitionArgs);
106
107 this._socket.transition(action, actionArguments, function(err, body) {
108 if(err) {
109 cb(err);
110 } else {
111 self._update(body);
112 cb();
113 }
114 });
115
116};
117
118VirtualDevice.prototype._encodeData = function(action, transitionArgs) {
119 var actionArguments = {};
120 action.fields.forEach(function(arg) {
121 if(arg.type === 'hidden') {
122 actionArguments[arg.name] = arg.value;
123 } else if(transitionArgs.length) {
124 actionArguments[arg.name] = transitionArgs.shift();
125 }
126 });
127
128 return actionArguments;
129};
130
131VirtualDevice.prototype._update = function(entity) {
132 var self = this;
133 Object.keys(entity.properties).forEach(function(prop) {
134 self[prop] = entity.properties[prop];
135 });
136 this._actions = entity.actions;
137
138 if(entity.links) {
139 this._links = entity.links;
140 }
141};
142
143VirtualDevice.prototype._getAction = function(name) {
144 var returnAction;
145 this._actions.some(function(action) {
146 if(action.name === name) {
147 returnAction = action;
148 return true;
149 }
150 });
151 return returnAction;
152};
153
154VirtualDevice.prototype._getLinkWithTitle = function(title) {
155 var returnLink;
156 this._links.some(function(link) {
157 if(link.title === title) {
158 returnLink = link;
159 return true;
160 }
161 });
162 return returnLink;
163};
164
165VirtualDevice.prototype._getTopic = function(link) {
166 var querystring = url.parse(link.href, true);
167 return querystring.query.topic;
168};
169
170VirtualDevice.prototype._getLinksWithRel = function(rel) {
171 var returnLinks = this._links.filter(function(link) {
172 return link.rel.indexOf(rel) !== -1;
173 });
174 return returnLinks;
175};