UNPKG

5.41 kBJavaScriptView Raw
1var url = require('url');
2var util = require('util');
3var ReadableStream = require('stream').Readable;
4var EventEmitter = require('events').EventEmitter;
5var rels = require('zetta-rels');
6var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
7
8var VirtualStream = module.exports = function(topic, socket, options) {
9 ReadableStream.call(this, options);
10 this._topic = topic;
11 this._socket = socket;
12 this.listener = null;
13};
14util.inherits(VirtualStream, ReadableStream);
15
16VirtualStream.prototype._read = function(size) {
17 var self = this;
18
19 if(!this.listener) {
20 this.listener = function(data) {
21 if(!self.push(data)) {
22 self._socket.unsubscribe(self._topic, self.listener);
23 self.listener = null;
24 }
25 };
26 this._socket.subscribe(this._topic);
27 this._socket.on(this._topic, this.listener);
28 }
29};
30
31var VirtualDevice = module.exports = function(entity, peerSocket) {
32 var self = this;
33 this._socket = peerSocket;
34 this._update(entity);
35
36 this._eventEmitter = new EventEmitter();
37 this.on = this._eventEmitter.on.bind(this._eventEmitter);
38
39 var logTopic = this._getTopic(this._getLinkWithTitle('logs'));
40 this._socket.subscribe(logTopic, function() {
41 // TODO(adammagaluk): We should ensure ready is called only
42 // once. Subscribe callback is being called on disconnect for
43 // some reason.
44 self._eventEmitter.emit('ready');
45 });
46
47 this._socket.on(logTopic, function(data) {
48 // Format data.actions to siren action format
49 data.actions = buildDeviceActions(data.properties.id, self._socket.ws._env, self._socket.ws._loader, data.transitions);
50 delete data.transitions;
51
52 self._update(data);
53 self._eventEmitter.emit(data.transition);
54 });
55
56 self._eventEmitter.on('zetta-device-destroy', function() {
57 self._eventEmitter.emit('remote-destroy', self);
58 self._eventEmitter.emit('destroy');
59 });
60
61 // setup streams
62 this.streams = {};
63
64 // add all object-stream
65 this._getLinksWithRel(rels.objectStream).forEach(function(monitor) {
66 var topic = self._getTopic(monitor);
67 // subscribe to topic
68 self._socket.subscribe(topic);
69 if(!self.streams[monitor.title]) {
70 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: true });
71 }
72 self._socket.on(topic, function(data) {
73 self[monitor.title] = data.data;
74 });
75 });
76
77 // add all binary-stream
78 this._getLinksWithRel(rels.binaryStream).forEach(function(monitor) {
79 var topic = self._getTopic(monitor);
80 if(!self.streams[monitor.title]) {
81 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: false });
82 }
83 self._socket.on(topic, function(data) {
84 self[monitor.title] = data.data;
85 });
86 });
87
88};
89
90VirtualDevice.prototype.createReadStream = function(name) {
91 var link = this._getLinkWithTitle(name);
92 return new VirtualStream(this._getTopic(link), this._socket, { objectMode: (link.rel.indexOf(rels.objectStream) > -1) });
93};
94
95VirtualDevice.prototype.call = function(/* transition, args, cb */) {
96 var self = this;
97 var args = Array.prototype.slice.call(arguments);
98 var transition = args[0];
99
100 var cb, transitionArgs;
101 if(typeof args[args.length - 1] === 'function') {
102 cb = args[args.length - 1];
103 transitionArgs = args.slice(1, args.length - 1);
104 } else {
105 transitionArgs = args.slice(1, args.length);
106 cb = function(err) {
107 if (err) {
108 throw err;
109 }
110 };
111 }
112
113 var action = this._getAction(transition);
114 if(!action) {
115 cb(new Error('Transition not available'));
116 return;
117 }
118
119 var actionArguments = this._encodeData(action, transitionArgs);
120
121 this._socket.transition(action, actionArguments, function(err, body) {
122 if(err) {
123 cb(err);
124 } else {
125 self._update(body);
126 cb();
127 }
128 });
129
130};
131
132VirtualDevice.prototype.available = function(transition) {
133 return !!this._getAction(transition);
134};
135
136VirtualDevice.prototype._encodeData = function(action, transitionArgs) {
137 var actionArguments = {};
138 action.fields.forEach(function(arg) {
139 if(arg.type === 'hidden') {
140 actionArguments[arg.name] = arg.value;
141 } else if(transitionArgs.length) {
142 actionArguments[arg.name] = transitionArgs.shift();
143 }
144 });
145
146 return actionArguments;
147};
148
149VirtualDevice.prototype._update = function(entity) {
150 var self = this;
151 Object.keys(entity.properties).forEach(function(prop) {
152 self[prop] = entity.properties[prop];
153 });
154 this._actions = entity.actions;
155
156 if(entity.links) {
157 this._links = entity.links;
158 }
159};
160
161VirtualDevice.prototype._getAction = function(name) {
162 var returnAction;
163 this._actions.some(function(action) {
164 if(action.name === name) {
165 returnAction = action;
166 return true;
167 }
168 });
169 return returnAction;
170};
171
172VirtualDevice.prototype._getLinkWithTitle = function(title) {
173 var returnLink;
174 this._links.some(function(link) {
175 if(link.title === title) {
176 returnLink = link;
177 return true;
178 }
179 });
180 return returnLink;
181};
182
183VirtualDevice.prototype._getTopic = function(link) {
184 var querystring = url.parse(link.href, true);
185 return querystring.query.topic;
186};
187
188VirtualDevice.prototype._getLinksWithRel = function(rel) {
189 var returnLinks = this._links.filter(function(link) {
190 return link.rel.indexOf(rel) !== -1;
191 });
192 return returnLinks;
193};